Skip to content

Commit 2f02ca4

Browse files
apollo_consensus: move vote storage to SM; dedupe in SHC
1 parent acaae5f commit 2f02ca4

File tree

4 files changed

+165
-88
lines changed

4 files changed

+165
-88
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ pub(crate) struct SingleHeightConsensus {
137137
timeouts: TimeoutsConfig,
138138
state_machine: StateMachine,
139139
proposals: HashMap<Round, Option<ProposalCommitment>>,
140-
prevotes: HashMap<(Round, ValidatorId), Vote>,
141-
precommits: HashMap<(Round, ValidatorId), Vote>,
142140
last_prevote: Option<Vote>,
143141
last_precommit: Option<Vote>,
144142
}
@@ -161,8 +159,6 @@ impl SingleHeightConsensus {
161159
timeouts,
162160
state_machine,
163161
proposals: HashMap::new(),
164-
prevotes: HashMap::new(),
165-
precommits: HashMap::new(),
166162
last_prevote: None,
167163
last_precommit: None,
168164
}
@@ -254,7 +250,7 @@ impl SingleHeightConsensus {
254250
context.broadcast(last_vote.clone()).await?;
255251
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
256252
self.timeouts.get_prevote_timeout(0),
257-
StateMachineEvent::Prevote(last_vote.clone()),
253+
StateMachineEvent::Prevote(vote),
258254
)]))
259255
}
260256
StateMachineEvent::Precommit(vote) => {
@@ -271,7 +267,7 @@ impl SingleHeightConsensus {
271267
context.broadcast(last_vote.clone()).await?;
272268
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
273269
self.timeouts.get_precommit_timeout(0),
274-
StateMachineEvent::Precommit(last_vote.clone()),
270+
StateMachineEvent::Precommit(vote),
275271
)]))
276272
}
277273
StateMachineEvent::Proposal(proposal_id, round, valid_round) => {
@@ -349,29 +345,27 @@ impl SingleHeightConsensus {
349345
return Ok(ShcReturn::Tasks(Vec::new()));
350346
}
351347

352-
let (votes, sm_vote) = match vote.vote_type {
353-
VoteType::Prevote => (&mut self.prevotes, StateMachineEvent::Prevote(vote.clone())),
348+
// Check duplicates/conflicts from SM stored votes.
349+
let (votes_map, sm_vote) = match vote.vote_type {
350+
VoteType::Prevote => {
351+
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
352+
}
354353
VoteType::Precommit => {
355-
(&mut self.precommits, StateMachineEvent::Precommit(vote.clone()))
354+
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
356355
}
357356
};
358-
359-
match votes.entry((vote.round, vote.voter)) {
360-
Entry::Vacant(entry) => {
361-
entry.insert(vote.clone());
362-
}
363-
Entry::Occupied(entry) => {
364-
let old = entry.get();
365-
if old.proposal_commitment != vote.proposal_commitment {
366-
warn!("Conflicting votes: old={:?}, new={:?}", old, vote);
367-
CONSENSUS_CONFLICTING_VOTES.increment(1);
368-
return Ok(ShcReturn::Tasks(Vec::new()));
369-
} else {
370-
// Replay, ignore.
371-
return Ok(ShcReturn::Tasks(Vec::new()));
372-
}
357+
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
358+
if old_vote.proposal_commitment == vote.proposal_commitment {
359+
// Duplicate - ignore.
360+
return Ok(ShcReturn::Tasks(Vec::new()));
361+
} else {
362+
// Conflict - ignore and record.
363+
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
364+
CONSENSUS_CONFLICTING_VOTES.increment(1);
365+
return Ok(ShcReturn::Tasks(Vec::new()));
373366
}
374367
}
368+
375369
info!("Accepting {:?}", vote);
376370
let height = self.state_machine.height();
377371
let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) };
@@ -502,29 +496,24 @@ impl SingleHeightConsensus {
502496
context: &mut ContextT,
503497
vote: Vote,
504498
) -> Result<Vec<ShcTask>, ConsensusError> {
505-
let prevote_timeout = self.timeouts.get_prevote_timeout(vote.round);
506-
let precommit_timeout = self.timeouts.get_precommit_timeout(vote.round);
507-
let (votes, last_vote, task) = match vote.vote_type {
499+
let (last_vote, task) = match vote.vote_type {
508500
VoteType::Prevote => (
509-
&mut self.prevotes,
510501
&mut self.last_prevote,
511-
ShcTask::Prevote(prevote_timeout, StateMachineEvent::Prevote(vote.clone())),
502+
ShcTask::Prevote(
503+
self.timeouts.get_prevote_timeout(0),
504+
StateMachineEvent::Prevote(vote.clone()),
505+
),
512506
),
513507
VoteType::Precommit => (
514-
&mut self.precommits,
515508
&mut self.last_precommit,
516-
ShcTask::Precommit(precommit_timeout, StateMachineEvent::Precommit(vote.clone())),
509+
ShcTask::Precommit(
510+
self.timeouts.get_precommit_timeout(0),
511+
StateMachineEvent::Precommit(vote.clone()),
512+
),
517513
),
518514
};
519515
// Ensure the voter matches this node.
520516
assert_eq!(vote.voter, self.state_machine.validator_id());
521-
if let Some(old) =
522-
votes.insert((vote.round, self.state_machine.validator_id()), vote.clone())
523-
{
524-
return Err(ConsensusError::InternalInconsistency(format!(
525-
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
526-
)));
527-
}
528517
*last_vote = match last_vote {
529518
None => Some(vote.clone()),
530519
Some(last_vote) if vote.round > last_vote.round => Some(vote.clone()),
@@ -569,18 +558,7 @@ impl SingleHeightConsensus {
569558
{block}"
570559
)));
571560
}
572-
let supporting_precommits: Vec<Vote> = self
573-
.validators
574-
.iter()
575-
.filter_map(|v| {
576-
let vote = self.precommits.get(&(round, *v))?;
577-
if vote.proposal_commitment == Some(proposal_id) {
578-
Some(vote.clone())
579-
} else {
580-
None
581-
}
582-
})
583-
.collect();
561+
let supporting_precommits = self.precommit_votes_for_value(round, Some(proposal_id));
584562

585563
// TODO(matan): Check actual weights.
586564
let vote_weight = u64::try_from(supporting_precommits.len())
@@ -596,4 +574,18 @@ impl SingleHeightConsensus {
596574
}
597575
Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block }))
598576
}
577+
578+
fn precommit_votes_for_value(
579+
&self,
580+
round: Round,
581+
value: Option<ProposalCommitment>,
582+
) -> Vec<Vote> {
583+
self.state_machine
584+
.precommits_ref()
585+
.iter()
586+
.filter_map(|(&(r, _voter), (v, _w))| {
587+
if r == round && v.proposal_commitment == value { Some(v.clone()) } else { None }
588+
})
589+
.collect()
590+
}
599591
}

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ async fn rebroadcast_votes() {
334334
TIMEOUTS.clone(),
335335
);
336336

337-
context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
337+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
338338
context.expect_build_proposal().times(1).returning(move |_, _| {
339339
let (block_sender, block_receiver) = oneshot::channel();
340340
block_sender.send(BLOCK.id).unwrap();
@@ -360,27 +360,72 @@ async fn rebroadcast_votes() {
360360
// 3 of 4 Prevotes is enough to send a Precommit.
361361
context
362362
.expect_broadcast()
363-
.times(2) // vote rebroadcast
364-
.withf(move |msg: &Vote| {
365-
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
366-
})
363+
.times(1) // just the first precommit at round 0.
364+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
367365
.returning(move |_| Ok(()));
368366
// The Node got a Prevote quorum.
369367
assert_eq!(
370368
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await,
371369
Ok(ShcReturn::Tasks(vec![
372370
timeout_prevote_task(0),
371+
// initiate timeout for rebroadcast at round 0.
373372
precommit_task(Some(BLOCK.id.0), 0, *PROPOSER_ID),
374373
]))
375374
);
376-
// Re-broadcast vote.
375+
// Advance to the next round with NIL precommits.
376+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_1)).await.unwrap();
377+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_2)).await.unwrap();
378+
context.expect_repropose().returning(move |id, init| {
379+
assert_eq!(init.height, BlockNumber(0));
380+
assert_eq!(id, BLOCK.id);
381+
assert_eq!(init.round, 1);
382+
});
383+
context
384+
.expect_broadcast()
385+
.times(1)
386+
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
387+
.returning(move |_| Ok(()));
388+
shc.handle_event(&mut context, StateMachineEvent::TimeoutPrecommit(0)).await.unwrap();
389+
context
390+
.expect_broadcast()
391+
.times(2) // first: initial precommit vote at r1, second: rebroadcast at r1
392+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
393+
.returning(move |_| Ok(()));
394+
assert_eq!(
395+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_2)).await,
396+
Ok(ShcReturn::Tasks(Vec::new()))
397+
);
398+
assert_eq!(
399+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3)).await,
400+
Ok(ShcReturn::Tasks(vec![
401+
timeout_prevote_task(1),
402+
// initiate timeout for rebroadcast at round 1.
403+
precommit_task(Some(BLOCK.id.0), 1, *PROPOSER_ID),
404+
]))
405+
);
406+
407+
// Re-broadcast with older vote (round 0) - should be ignored (no broadcast, no task).
377408
assert_eq!(
378409
shc.handle_event(
379410
&mut context,
380411
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
381412
)
382413
.await,
383-
Ok(ShcReturn::Tasks(vec![precommit_task(Some(BLOCK.id.0), 0, *PROPOSER_ID),]))
414+
Ok(ShcReturn::Tasks(Vec::new()))
415+
);
416+
417+
// Re-broadcast with current round (round 1) - should broadcast and schedule another timeout for
418+
// rebroadcast at round 1.
419+
assert_eq!(
420+
shc.handle_event(
421+
&mut context,
422+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
423+
)
424+
.await,
425+
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
426+
TIMEOUTS.get_precommit_timeout(0),
427+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)),
428+
)]))
384429
);
385430
}
386431

crates/apollo_consensus/src/state_machine.rs

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ pub(crate) struct StateMachine {
7878
is_observer: bool,
7979
// {round: (proposal_id, valid_round)}
8080
proposals: HashMap<Round, (Option<ProposalCommitment>, Option<Round>)>,
81-
// {round: {proposal_id: vote_count}
82-
prevotes: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
83-
precommits: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
81+
// {(round, voter): (vote, weight)}
82+
prevotes: HashMap<(Round, ValidatorId), (Vote, u32)>,
83+
precommits: HashMap<(Round, ValidatorId), (Vote, u32)>,
8484
// When true, the state machine will wait for a GetProposal event, buffering all other input
8585
// events in `events_queue`.
8686
awaiting_get_proposal: bool,
@@ -145,6 +145,14 @@ impl StateMachine {
145145
self.height
146146
}
147147

148+
pub(crate) fn prevotes_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
149+
&self.prevotes
150+
}
151+
152+
pub(crate) fn precommits_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
153+
&self.precommits
154+
}
155+
148156
fn make_self_vote(
149157
&self,
150158
vote_type: VoteType,
@@ -328,15 +336,15 @@ impl StateMachine {
328336
where
329337
LeaderFn: Fn(Round) -> ValidatorId,
330338
{
331-
let prevote_count = self
332-
.prevotes
333-
.entry(vote.round)
334-
.or_default()
335-
.entry(vote.proposal_commitment)
336-
.or_insert(0);
337-
// TODO(matan): Use variable weight.
338-
*prevote_count += 1;
339-
self.map_round_to_upons(vote.round, leader_fn)
339+
let round = vote.round;
340+
let voter = vote.voter;
341+
let inserted = self.prevotes.insert((round, voter), (vote, 1)).is_none();
342+
assert!(
343+
inserted,
344+
"SHC should handle conflicts & replays: duplicate prevote for round={round}, \
345+
voter={voter}",
346+
);
347+
self.map_round_to_upons(round, leader_fn)
340348
}
341349

342350
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
@@ -360,15 +368,15 @@ impl StateMachine {
360368
where
361369
LeaderFn: Fn(Round) -> ValidatorId,
362370
{
363-
let precommit_count = self
364-
.precommits
365-
.entry(vote.round)
366-
.or_default()
367-
.entry(vote.proposal_commitment)
368-
.or_insert(0);
369-
// TODO(matan): Use variable weight.
370-
*precommit_count += 1;
371-
self.map_round_to_upons(vote.round, leader_fn)
371+
let round = vote.round;
372+
let voter = vote.voter;
373+
let inserted = self.precommits.insert((round, voter), (vote, 1)).is_none();
374+
assert!(
375+
inserted,
376+
"SHC should handle conflicts & replays: duplicate precommit for round={round}, \
377+
voter={voter}"
378+
);
379+
self.map_round_to_upons(round, leader_fn)
372380
}
373381

374382
fn handle_timeout_precommit<LeaderFn>(
@@ -640,24 +648,36 @@ impl StateMachine {
640648

641649
fn round_has_enough_votes(
642650
&self,
643-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
651+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
644652
round: u32,
645653
threshold: &VotesThreshold,
646654
) -> bool {
647-
threshold
648-
.is_met(votes.get(&round).map_or(0, |v| v.values().sum()).into(), self.total_weight)
655+
let weight_sum = votes
656+
.iter()
657+
.filter_map(
658+
|(&(r, _voter), (_v, w))| if r == round { Some(u64::from(*w)) } else { None },
659+
)
660+
.sum();
661+
threshold.is_met(weight_sum, self.total_weight)
649662
}
650663

651664
fn value_has_enough_votes(
652665
&self,
653-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
666+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
654667
round: u32,
655668
value: &Option<ProposalCommitment>,
656669
threshold: &VotesThreshold,
657670
) -> bool {
658-
threshold.is_met(
659-
votes.get(&round).map_or(0, |v| *v.get(value).unwrap_or(&0)).into(),
660-
self.total_weight,
661-
)
671+
let weight_sum = votes
672+
.iter()
673+
.filter_map(|(&(r, _voter), (v, w))| {
674+
if r == round && &v.proposal_commitment == value {
675+
Some(u64::from(*w))
676+
} else {
677+
None
678+
}
679+
})
680+
.sum();
681+
threshold.is_met(weight_sum, self.total_weight)
662682
}
663683
}

0 commit comments

Comments
 (0)