Skip to content

Commit 4771243

Browse files
apollo_consensus: move vote storage to SM; dedupe in SHC
1 parent 8629853 commit 4771243

File tree

4 files changed

+167
-90
lines changed

4 files changed

+167
-90
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@ pub(crate) struct SingleHeightConsensus {
139139
timeouts: TimeoutsConfig,
140140
state_machine: StateMachine,
141141
proposals: HashMap<Round, Option<ProposalCommitment>>,
142-
prevotes: HashMap<(Round, ValidatorId), Vote>,
143-
precommits: HashMap<(Round, ValidatorId), Vote>,
144142
last_prevote: Option<Vote>,
145143
last_precommit: Option<Vote>,
146144
height_voted_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
@@ -165,8 +163,6 @@ impl SingleHeightConsensus {
165163
timeouts,
166164
state_machine,
167165
proposals: HashMap::new(),
168-
prevotes: HashMap::new(),
169-
precommits: HashMap::new(),
170166
last_prevote: None,
171167
last_precommit: None,
172168
height_voted_storage,
@@ -259,7 +255,7 @@ impl SingleHeightConsensus {
259255
context.broadcast(last_vote.clone()).await?;
260256
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
261257
self.timeouts.get_prevote_timeout(0),
262-
StateMachineEvent::Prevote(last_vote.clone()),
258+
StateMachineEvent::Prevote(vote),
263259
)]))
264260
}
265261
StateMachineEvent::Precommit(vote) => {
@@ -276,7 +272,7 @@ impl SingleHeightConsensus {
276272
context.broadcast(last_vote.clone()).await?;
277273
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
278274
self.timeouts.get_precommit_timeout(0),
279-
StateMachineEvent::Precommit(last_vote.clone()),
275+
StateMachineEvent::Precommit(vote),
280276
)]))
281277
}
282278
StateMachineEvent::Proposal(proposal_id, round, valid_round) => {
@@ -354,29 +350,27 @@ impl SingleHeightConsensus {
354350
return Ok(ShcReturn::Tasks(Vec::new()));
355351
}
356352

357-
let (votes, sm_vote) = match vote.vote_type {
358-
VoteType::Prevote => (&mut self.prevotes, StateMachineEvent::Prevote(vote.clone())),
353+
// Check duplicates/conflicts from SM stored votes.
354+
let (votes_map, sm_vote) = match vote.vote_type {
355+
VoteType::Prevote => {
356+
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
357+
}
359358
VoteType::Precommit => {
360-
(&mut self.precommits, StateMachineEvent::Precommit(vote.clone()))
359+
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
361360
}
362361
};
363-
364-
match votes.entry((vote.round, vote.voter)) {
365-
Entry::Vacant(entry) => {
366-
entry.insert(vote.clone());
367-
}
368-
Entry::Occupied(entry) => {
369-
let old = entry.get();
370-
if old.proposal_commitment != vote.proposal_commitment {
371-
warn!("Conflicting votes: old={:?}, new={:?}", old, vote);
372-
CONSENSUS_CONFLICTING_VOTES.increment(1);
373-
return Ok(ShcReturn::Tasks(Vec::new()));
374-
} else {
375-
// Replay, ignore.
376-
return Ok(ShcReturn::Tasks(Vec::new()));
377-
}
362+
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
363+
if old_vote.proposal_commitment == vote.proposal_commitment {
364+
// Duplicate - ignore.
365+
return Ok(ShcReturn::Tasks(Vec::new()));
366+
} else {
367+
// Conflict - ignore and record.
368+
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
369+
CONSENSUS_CONFLICTING_VOTES.increment(1);
370+
return Ok(ShcReturn::Tasks(Vec::new()));
378371
}
379372
}
373+
380374
info!("Accepting {:?}", vote);
381375
let height = self.state_machine.height();
382376
let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) };
@@ -507,29 +501,24 @@ impl SingleHeightConsensus {
507501
context: &mut ContextT,
508502
vote: Vote,
509503
) -> Result<Vec<ShcTask>, ConsensusError> {
510-
let prevote_timeout = self.timeouts.get_prevote_timeout(vote.round);
511-
let precommit_timeout = self.timeouts.get_precommit_timeout(vote.round);
512-
let (votes, last_vote, task) = match vote.vote_type {
504+
let (last_vote, task) = match vote.vote_type {
513505
VoteType::Prevote => (
514-
&mut self.prevotes,
515506
&mut self.last_prevote,
516-
ShcTask::Prevote(prevote_timeout, StateMachineEvent::Prevote(vote.clone())),
507+
ShcTask::Prevote(
508+
self.timeouts.get_prevote_timeout(0),
509+
StateMachineEvent::Prevote(vote.clone()),
510+
),
517511
),
518512
VoteType::Precommit => (
519-
&mut self.precommits,
520513
&mut self.last_precommit,
521-
ShcTask::Precommit(precommit_timeout, StateMachineEvent::Precommit(vote.clone())),
514+
ShcTask::Precommit(
515+
self.timeouts.get_precommit_timeout(0),
516+
StateMachineEvent::Precommit(vote.clone()),
517+
),
522518
),
523519
};
524520
// Ensure the voter matches this node.
525521
assert_eq!(vote.voter, self.state_machine.validator_id());
526-
if let Some(old) =
527-
votes.insert((vote.round, self.state_machine.validator_id()), vote.clone())
528-
{
529-
return Err(ConsensusError::InternalInconsistency(format!(
530-
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
531-
)));
532-
}
533522
*last_vote = match last_vote {
534523
None => Some(vote.clone()),
535524
Some(last_vote) if vote.round > last_vote.round => Some(vote.clone()),
@@ -583,18 +572,7 @@ impl SingleHeightConsensus {
583572
{block}"
584573
)));
585574
}
586-
let supporting_precommits: Vec<Vote> = self
587-
.validators
588-
.iter()
589-
.filter_map(|v| {
590-
let vote = self.precommits.get(&(round, *v))?;
591-
if vote.proposal_commitment == Some(proposal_id) {
592-
Some(vote.clone())
593-
} else {
594-
None
595-
}
596-
})
597-
.collect();
575+
let supporting_precommits = self.precommit_votes_for_value(round, Some(proposal_id));
598576

599577
// TODO(matan): Check actual weights.
600578
let vote_weight = u64::try_from(supporting_precommits.len())
@@ -610,4 +588,18 @@ impl SingleHeightConsensus {
610588
}
611589
Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block }))
612590
}
591+
592+
fn precommit_votes_for_value(
593+
&self,
594+
round: Round,
595+
value: Option<ProposalCommitment>,
596+
) -> Vec<Vote> {
597+
self.state_machine
598+
.precommits_ref()
599+
.iter()
600+
.filter_map(|(&(r, _voter), (v, _w))| {
601+
if r == round && v.proposal_commitment == value { Some(v.clone()) } else { None }
602+
})
603+
.collect()
604+
}
613605
}

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit {
5454
}
5555

5656
fn prevote_task(block_felt: Option<Felt>, height: u64, round: u32, voter: ValidatorId) -> ShcTask {
57-
let duration = TIMEOUTS.get_prevote_timeout(round);
57+
let duration = TIMEOUTS.get_prevote_timeout(0);
5858
ShcTask::Prevote(
5959
duration,
6060
StateMachineEvent::Prevote(prevote(block_felt, height, round, voter)),
@@ -67,7 +67,7 @@ fn precommit_task(
6767
round: u32,
6868
voter: ValidatorId,
6969
) -> ShcTask {
70-
let duration = TIMEOUTS.get_precommit_timeout(round);
70+
let duration = TIMEOUTS.get_precommit_timeout(0);
7171
ShcTask::Precommit(
7272
duration,
7373
StateMachineEvent::Precommit(precommit(block_felt, height, round, voter)),
@@ -357,7 +357,7 @@ async fn rebroadcast_votes() {
357357
Arc::new(Mutex::new(NoOpHeightVotedStorage)),
358358
);
359359

360-
context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
360+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
361361
context.expect_build_proposal().times(1).returning(move |_, _| {
362362
let (block_sender, block_receiver) = oneshot::channel();
363363
block_sender.send(BLOCK.id).unwrap();
@@ -383,27 +383,72 @@ async fn rebroadcast_votes() {
383383
// 3 of 4 Prevotes is enough to send a Precommit.
384384
context
385385
.expect_broadcast()
386-
.times(2) // vote rebroadcast
387-
.withf(move |msg: &Vote| {
388-
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
389-
})
386+
.times(1) // just the first precommit at round 0.
387+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
390388
.returning(move |_| Ok(()));
391389
// The Node got a Prevote quorum.
392390
assert_eq!(
393391
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await,
394392
Ok(ShcReturn::Tasks(vec![
395393
timeout_prevote_task(0),
394+
// initiate timeout for rebroadcast at round 0.
396395
precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID),
397396
]))
398397
);
399-
// Re-broadcast vote.
398+
// Advance to the next round with NIL precommits.
399+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_1)).await.unwrap();
400+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_2)).await.unwrap();
401+
context.expect_repropose().returning(move |id, init| {
402+
assert_eq!(init.height, BlockNumber(0));
403+
assert_eq!(id, BLOCK.id);
404+
assert_eq!(init.round, 1);
405+
});
406+
context
407+
.expect_broadcast()
408+
.times(1)
409+
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
410+
.returning(move |_| Ok(()));
411+
shc.handle_event(&mut context, StateMachineEvent::TimeoutPrecommit(0)).await.unwrap();
412+
context
413+
.expect_broadcast()
414+
.times(2) // first: initial precommit vote at r1, second: rebroadcast at r1
415+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
416+
.returning(move |_| Ok(()));
417+
assert_eq!(
418+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_2)).await,
419+
Ok(ShcReturn::Tasks(Vec::new()))
420+
);
421+
assert_eq!(
422+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3)).await,
423+
Ok(ShcReturn::Tasks(vec![
424+
timeout_prevote_task(1),
425+
// initiate timeout for rebroadcast at round 1.
426+
precommit_task(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID),
427+
]))
428+
);
429+
430+
// Re-broadcast with older vote (round 0) - should be ignored (no broadcast, no task).
400431
assert_eq!(
401432
shc.handle_event(
402433
&mut context,
403434
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
404435
)
405436
.await,
406-
Ok(ShcReturn::Tasks(vec![precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID),]))
437+
Ok(ShcReturn::Tasks(Vec::new()))
438+
);
439+
440+
// Re-broadcast with current round (round 1) - should broadcast and schedule another timeout for
441+
// rebroadcast at round 1.
442+
assert_eq!(
443+
shc.handle_event(
444+
&mut context,
445+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
446+
)
447+
.await,
448+
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
449+
TIMEOUTS.get_precommit_timeout(0),
450+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)),
451+
)]))
407452
);
408453
}
409454

crates/apollo_consensus/src/state_machine.rs

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ pub(crate) struct StateMachine {
7878
is_observer: bool,
7979
// {round: (proposal_id, valid_round)}
8080
proposals: HashMap<Round, (Option<ProposalCommitment>, Option<Round>)>,
81-
// {round: {proposal_id: vote_count}
82-
prevotes: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
83-
precommits: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
81+
// {(round, voter): (vote, weight)}
82+
prevotes: HashMap<(Round, ValidatorId), (Vote, u32)>,
83+
precommits: HashMap<(Round, ValidatorId), (Vote, u32)>,
8484
// When true, the state machine will wait for a GetProposal event, buffering all other input
8585
// events in `events_queue`.
8686
awaiting_get_proposal: bool,
@@ -145,6 +145,14 @@ impl StateMachine {
145145
self.height
146146
}
147147

148+
pub(crate) fn prevotes_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
149+
&self.prevotes
150+
}
151+
152+
pub(crate) fn precommits_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
153+
&self.precommits
154+
}
155+
148156
fn make_self_vote(
149157
&self,
150158
vote_type: VoteType,
@@ -328,15 +336,15 @@ impl StateMachine {
328336
where
329337
LeaderFn: Fn(Round) -> ValidatorId,
330338
{
331-
let prevote_count = self
332-
.prevotes
333-
.entry(vote.round)
334-
.or_default()
335-
.entry(vote.proposal_commitment)
336-
.or_insert(0);
337-
// TODO(matan): Use variable weight.
338-
*prevote_count += 1;
339-
self.map_round_to_upons(vote.round, leader_fn)
339+
let round = vote.round;
340+
let voter = vote.voter;
341+
let inserted = self.prevotes.insert((round, voter), (vote, 1)).is_none();
342+
assert!(
343+
inserted,
344+
"SHC should handle conflicts & replays: duplicate prevote for round={round}, \
345+
voter={voter}",
346+
);
347+
self.map_round_to_upons(round, leader_fn)
340348
}
341349

342350
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
@@ -360,15 +368,15 @@ impl StateMachine {
360368
where
361369
LeaderFn: Fn(Round) -> ValidatorId,
362370
{
363-
let precommit_count = self
364-
.precommits
365-
.entry(vote.round)
366-
.or_default()
367-
.entry(vote.proposal_commitment)
368-
.or_insert(0);
369-
// TODO(matan): Use variable weight.
370-
*precommit_count += 1;
371-
self.map_round_to_upons(vote.round, leader_fn)
371+
let round = vote.round;
372+
let voter = vote.voter;
373+
let inserted = self.precommits.insert((round, voter), (vote, 1)).is_none();
374+
assert!(
375+
inserted,
376+
"SHC should handle conflicts & replays: duplicate precommit for round={round}, \
377+
voter={voter}"
378+
);
379+
self.map_round_to_upons(round, leader_fn)
372380
}
373381

374382
fn handle_timeout_precommit<LeaderFn>(
@@ -640,24 +648,36 @@ impl StateMachine {
640648

641649
fn round_has_enough_votes(
642650
&self,
643-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
651+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
644652
round: u32,
645653
threshold: &VotesThreshold,
646654
) -> bool {
647-
threshold
648-
.is_met(votes.get(&round).map_or(0, |v| v.values().sum()).into(), self.total_weight)
655+
let weight_sum = votes
656+
.iter()
657+
.filter_map(
658+
|(&(r, _voter), (_v, w))| if r == round { Some(u64::from(*w)) } else { None },
659+
)
660+
.sum();
661+
threshold.is_met(weight_sum, self.total_weight)
649662
}
650663

651664
fn value_has_enough_votes(
652665
&self,
653-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
666+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
654667
round: u32,
655668
value: &Option<ProposalCommitment>,
656669
threshold: &VotesThreshold,
657670
) -> bool {
658-
threshold.is_met(
659-
votes.get(&round).map_or(0, |v| *v.get(value).unwrap_or(&0)).into(),
660-
self.total_weight,
661-
)
671+
let weight_sum = votes
672+
.iter()
673+
.filter_map(|(&(r, _voter), (v, w))| {
674+
if r == round && &v.proposal_commitment == value {
675+
Some(u64::from(*w))
676+
} else {
677+
None
678+
}
679+
})
680+
.sum();
681+
threshold.is_met(weight_sum, self.total_weight)
662682
}
663683
}

0 commit comments

Comments
 (0)