Skip to content

Commit 488fb97

Browse files
apollo_consensus: prevent dup/conflict vote queuing during awaiting_finished_building (#10616)
1 parent ac89330 commit 488fb97

File tree

3 files changed

+124
-32
lines changed

3 files changed

+124
-32
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ mod single_height_consensus_test;
1010

1111
use std::collections::{HashSet, VecDeque};
1212

13+
use crate::state_machine::VoteStatus;
1314
const REBROADCAST_LOG_PERIOD_SECS: u64 = 10;
15+
const DUPLICATE_VOTE_LOG_PERIOD_SECS: u64 = 10;
1416

1517
use apollo_consensus_config::config::TimeoutsConfig;
1618
use apollo_infra_utils::trace_every_n_sec;
@@ -274,29 +276,32 @@ impl SingleHeightConsensus {
274276
return Ok(ShcReturn::Requests(VecDeque::new()));
275277
}
276278

277-
// Check duplicates/conflicts from SM stored votes.
278-
let (votes_map, sm_vote) = match vote.vote_type {
279-
VoteType::Prevote => {
280-
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
281-
}
282-
VoteType::Precommit => {
283-
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
284-
}
285-
};
286-
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
287-
if old_vote.proposal_commitment == vote.proposal_commitment {
279+
// Check if vote has already been received.
280+
match self.state_machine.received_vote(&vote) {
281+
VoteStatus::Duplicate => {
288282
// Duplicate - ignore.
283+
trace_every_n_sec!(
284+
DUPLICATE_VOTE_LOG_PERIOD_SECS,
285+
"Ignoring duplicate vote: {vote:?}"
286+
);
289287
return Ok(ShcReturn::Requests(VecDeque::new()));
290-
} else {
288+
}
289+
VoteStatus::Conflict(old_vote, new_vote) => {
291290
// Conflict - ignore and record.
292-
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
291+
warn!("Conflicting votes: old={old_vote:?}, new={new_vote:?}");
293292
CONSENSUS_CONFLICTING_VOTES.increment(1);
294293
return Ok(ShcReturn::Requests(VecDeque::new()));
295294
}
295+
VoteStatus::New => {
296+
// Vote is new, proceed to process it.
297+
}
296298
}
297299

298300
info!("Accepting {:?}", vote);
299-
// TODO(Asmaa): consider calling handle_prevote/precommit instead of sending the vote event.
301+
let sm_vote = match vote.vote_type {
302+
VoteType::Prevote => StateMachineEvent::Prevote(vote),
303+
VoteType::Precommit => StateMachineEvent::Precommit(vote),
304+
};
300305
let requests = self.state_machine.handle_event(sm_vote, leader_fn);
301306
self.handle_state_machine_requests(requests)
302307
}

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,3 +361,52 @@ fn repropose() {
361361
assert!(reqs.is_empty());
362362
});
363363
}
364+
365+
#[tokio::test]
366+
async fn duplicate_votes_during_awaiting_finished_building_are_ignored() {
367+
// This test verifies that receiving 3 identical prevotes during awaiting_finished_building
368+
// results in only one vote being processed, so no TimeoutPrevote is triggered.
369+
let mut shc = SingleHeightConsensus::new(
370+
BlockNumber(0),
371+
false,
372+
*PROPOSER_ID,
373+
VALIDATORS.to_vec(),
374+
QuorumType::Byzantine,
375+
TIMEOUTS.clone(),
376+
);
377+
let leader_fn = |_round| -> ValidatorId { *PROPOSER_ID };
378+
let ret = shc.start(&leader_fn).unwrap();
379+
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
380+
assert_matches!(reqs.pop_front(), Some(SMRequest::StartBuildProposal(ROUND_0)));
381+
assert!(reqs.is_empty());
382+
});
383+
384+
// Receive enough identical prevotes during awaiting_finished_building to trigger Timeout
385+
// (if they weren't duplicates)
386+
let duplicate_vote = prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_1);
387+
388+
// First vote gets queued
389+
assert_matches!(
390+
shc.handle_vote(&leader_fn, duplicate_vote.clone()).unwrap(),
391+
ShcReturn::Requests(reqs) if reqs.is_empty()
392+
);
393+
394+
// Remaining votes are duplicates - should be ignored
395+
for _ in 1..VALIDATORS.len() {
396+
assert_matches!(
397+
shc.handle_vote(&leader_fn, duplicate_vote.clone()).unwrap(),
398+
ShcReturn::Requests(reqs) if reqs.is_empty()
399+
);
400+
}
401+
402+
// Finish building - processes the queue
403+
// Only one vote was queued (duplicates were ignored), so no TimeoutPrevote should be triggered,
404+
// only a broadcast vote
405+
let ret = shc
406+
.handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), ROUND_0))
407+
.unwrap();
408+
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
409+
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == ROUND_0);
410+
assert!(reqs.is_empty());
411+
});
412+
}

crates/apollo_consensus/src/state_machine.rs

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,17 @@ pub(crate) enum Step {
8383
Precommit,
8484
}
8585

86+
/// Status of a vote when checking if it has been received.
87+
#[derive(Debug, PartialEq)]
88+
pub(crate) enum VoteStatus {
89+
/// Vote is new and has not been received yet.
90+
New,
91+
/// Vote is a duplicate (same proposal_commitment) - already received or queued.
92+
Duplicate,
93+
/// Vote conflicts with an existing vote (different proposal_commitment).
94+
Conflict(Vote, Vote),
95+
}
96+
8697
/// State Machine. Major assumptions:
8798
/// 1. SHC handles: authentication, replays, and conflicts.
8899
/// 2. SM must handle "out of order" messages (E.g. vote arrives before proposal).
@@ -167,10 +178,6 @@ impl StateMachine {
167178
self.height
168179
}
169180

170-
pub(crate) fn prevotes_ref(&self) -> &VotesMap {
171-
&self.prevotes
172-
}
173-
174181
pub(crate) fn precommits_ref(&self) -> &VotesMap {
175182
&self.precommits
176183
}
@@ -191,6 +198,44 @@ impl StateMachine {
191198
self.last_self_precommit.clone()
192199
}
193200

201+
/// Check if a vote has already been received (either in the vote maps or queued).
202+
/// Returns the status of the vote: NotReceived, Duplicate, or Conflict.
203+
pub(crate) fn received_vote(&self, vote: &Vote) -> VoteStatus {
204+
let determine_status = |old: &Vote, new: &Vote| {
205+
if old.proposal_commitment == new.proposal_commitment {
206+
VoteStatus::Duplicate
207+
} else {
208+
VoteStatus::Conflict(old.clone(), new.clone())
209+
}
210+
};
211+
212+
// Check Map
213+
let key = (vote.round, vote.voter);
214+
let map_entry = match vote.vote_type {
215+
VoteType::Prevote => self.prevotes.get(&key),
216+
VoteType::Precommit => self.precommits.get(&key),
217+
};
218+
219+
if let Some((old_vote, _)) = map_entry {
220+
return determine_status(old_vote, vote);
221+
}
222+
223+
// Check Queue
224+
for event in &self.events_queue {
225+
let queued_vote = match (event, vote.vote_type) {
226+
(StateMachineEvent::Prevote(v), VoteType::Prevote) => v,
227+
(StateMachineEvent::Precommit(v), VoteType::Precommit) => v,
228+
_ => continue,
229+
};
230+
231+
if queued_vote.round == vote.round && queued_vote.voter == vote.voter {
232+
return determine_status(queued_vote, vote);
233+
}
234+
}
235+
236+
VoteStatus::New
237+
}
238+
194239
fn make_self_vote(
195240
&mut self,
196241
vote_type: VoteType,
@@ -274,10 +319,7 @@ impl StateMachine {
274319
self.handle_enqueued_events(leader_fn)
275320
}
276321

277-
pub(crate) fn handle_enqueued_events<LeaderFn>(
278-
&mut self,
279-
leader_fn: &LeaderFn,
280-
) -> VecDeque<SMRequest>
322+
fn handle_enqueued_events<LeaderFn>(&mut self, leader_fn: &LeaderFn) -> VecDeque<SMRequest>
281323
where
282324
LeaderFn: Fn(Round) -> ValidatorId,
283325
{
@@ -311,7 +353,7 @@ impl StateMachine {
311353
output_requests
312354
}
313355

314-
pub(crate) fn handle_event_internal<LeaderFn>(
356+
fn handle_event_internal<LeaderFn>(
315357
&mut self,
316358
event: StateMachineEvent,
317359
leader_fn: &LeaderFn,
@@ -344,7 +386,7 @@ impl StateMachine {
344386
}
345387
}
346388

347-
pub(crate) fn handle_finished_building<LeaderFn>(
389+
fn handle_finished_building<LeaderFn>(
348390
&mut self,
349391
proposal_id: Option<ProposalCommitment>,
350392
round: u32,
@@ -362,7 +404,7 @@ impl StateMachine {
362404
self.map_round_to_upons(round, leader_fn)
363405
}
364406

365-
pub(crate) fn handle_finished_validation<LeaderFn>(
407+
fn handle_finished_validation<LeaderFn>(
366408
&mut self,
367409
proposal_id: Option<ProposalCommitment>,
368410
round: u32,
@@ -377,7 +419,7 @@ impl StateMachine {
377419
self.map_round_to_upons(round, leader_fn)
378420
}
379421

380-
pub(crate) fn handle_timeout_propose(&mut self, round: u32) -> VecDeque<SMRequest> {
422+
fn handle_timeout_propose(&mut self, round: u32) -> VecDeque<SMRequest> {
381423
if self.step != Step::Propose || round != self.round {
382424
return VecDeque::new();
383425
};
@@ -392,11 +434,7 @@ impl StateMachine {
392434
}
393435

394436
// A prevote from a peer node.
395-
pub(crate) fn handle_prevote<LeaderFn>(
396-
&mut self,
397-
vote: Vote,
398-
leader_fn: &LeaderFn,
399-
) -> VecDeque<SMRequest>
437+
fn handle_prevote<LeaderFn>(&mut self, vote: Vote, leader_fn: &LeaderFn) -> VecDeque<SMRequest>
400438
where
401439
LeaderFn: Fn(Round) -> ValidatorId,
402440
{
@@ -411,7 +449,7 @@ impl StateMachine {
411449
self.map_round_to_upons(round, leader_fn)
412450
}
413451

414-
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<SMRequest> {
452+
fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<SMRequest> {
415453
if self.step != Step::Prevote || round != self.round {
416454
return VecDeque::new();
417455
};

0 commit comments

Comments
 (0)