Skip to content

Commit 0891466

Browse files
apollo_consensus: move vote storage to SM; dedupe in SHC
1 parent 87b8c38 commit 0891466

File tree

3 files changed

+91
-73
lines changed

3 files changed

+91
-73
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ pub(crate) struct SingleHeightConsensus {
137137
timeouts: TimeoutsConfig,
138138
state_machine: StateMachine,
139139
proposals: HashMap<Round, Option<ProposalCommitment>>,
140-
prevotes: HashMap<(Round, ValidatorId), Vote>,
141-
precommits: HashMap<(Round, ValidatorId), Vote>,
142140
last_prevote: Option<Vote>,
143141
last_precommit: Option<Vote>,
144142
}
@@ -161,8 +159,6 @@ impl SingleHeightConsensus {
161159
timeouts,
162160
state_machine,
163161
proposals: HashMap::new(),
164-
prevotes: HashMap::new(),
165-
precommits: HashMap::new(),
166162
last_prevote: None,
167163
last_precommit: None,
168164
}
@@ -354,29 +350,27 @@ impl SingleHeightConsensus {
354350
return Ok(ShcReturn::Tasks(Vec::new()));
355351
}
356352

357-
let (votes, sm_vote) = match vote.vote_type {
358-
VoteType::Prevote => (&mut self.prevotes, StateMachineEvent::Prevote(vote.clone())),
353+
// Check duplicates/conflicts from SM stored votes.
354+
let (votes_map, sm_vote) = match vote.vote_type {
355+
VoteType::Prevote => {
356+
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
357+
}
359358
VoteType::Precommit => {
360-
(&mut self.precommits, StateMachineEvent::Precommit(vote.clone()))
359+
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
361360
}
362361
};
363-
364-
match votes.entry((vote.round, vote.voter)) {
365-
Entry::Vacant(entry) => {
366-
entry.insert(vote.clone());
367-
}
368-
Entry::Occupied(entry) => {
369-
let old = entry.get();
370-
if old.proposal_commitment != vote.proposal_commitment {
371-
warn!("Conflicting votes: old={:?}, new={:?}", old, vote);
372-
CONSENSUS_CONFLICTING_VOTES.increment(1);
373-
return Ok(ShcReturn::Tasks(Vec::new()));
374-
} else {
375-
// Replay, ignore.
376-
return Ok(ShcReturn::Tasks(Vec::new()));
377-
}
362+
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
363+
if old_vote.proposal_commitment == vote.proposal_commitment {
364+
// Duplicate - ignore.
365+
return Ok(ShcReturn::Tasks(Vec::new()));
366+
} else {
367+
// Conflict - ignore and record.
368+
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
369+
CONSENSUS_CONFLICTING_VOTES.increment(1);
370+
return Ok(ShcReturn::Tasks(Vec::new()));
378371
}
379372
}
373+
380374
info!("Accepting {:?}", vote);
381375
let height = self.state_machine.height();
382376
let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) };
@@ -493,17 +487,15 @@ impl SingleHeightConsensus {
493487
context: &mut ContextT,
494488
vote: Vote,
495489
) -> Result<Vec<ShcTask>, ConsensusError> {
496-
let (votes, last_vote, task) = match vote.vote_type {
490+
let (last_vote, task) = match vote.vote_type {
497491
VoteType::Prevote => (
498-
&mut self.prevotes,
499492
&mut self.last_prevote,
500493
ShcTask::Prevote(
501494
self.timeouts.prevote_timeout,
502495
StateMachineEvent::Prevote(vote.clone()),
503496
),
504497
),
505498
VoteType::Precommit => (
506-
&mut self.precommits,
507499
&mut self.last_precommit,
508500
ShcTask::Precommit(
509501
self.timeouts.precommit_timeout,
@@ -513,11 +505,6 @@ impl SingleHeightConsensus {
513505
};
514506
// Ensure the voter matches this node.
515507
assert_eq!(vote.voter, self.state_machine.id());
516-
if let Some(old) = votes.insert((vote.round, self.state_machine.id()), vote.clone()) {
517-
return Err(ConsensusError::InternalInconsistency(format!(
518-
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
519-
)));
520-
}
521508
*last_vote = match last_vote {
522509
None => Some(vote.clone()),
523510
Some(last_vote) if vote.round > last_vote.round => Some(vote.clone()),
@@ -562,18 +549,7 @@ impl SingleHeightConsensus {
562549
{block}"
563550
)));
564551
}
565-
let supporting_precommits: Vec<Vote> = self
566-
.validators
567-
.iter()
568-
.filter_map(|v| {
569-
let vote = self.precommits.get(&(round, *v))?;
570-
if vote.proposal_commitment == Some(proposal_id) {
571-
Some(vote.clone())
572-
} else {
573-
None
574-
}
575-
})
576-
.collect();
552+
let supporting_precommits = self.precommit_votes_for_value(round, Some(proposal_id));
577553

578554
// TODO(matan): Check actual weights.
579555
let vote_weight = u64::try_from(supporting_precommits.len())
@@ -589,4 +565,18 @@ impl SingleHeightConsensus {
589565
}
590566
Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block }))
591567
}
568+
569+
fn precommit_votes_for_value(
570+
&self,
571+
round: Round,
572+
value: Option<ProposalCommitment>,
573+
) -> Vec<Vote> {
574+
self.state_machine
575+
.precommits_ref()
576+
.iter()
577+
.filter_map(|(&(r, _voter), (v, _w))| {
578+
if r == round && v.proposal_commitment == value { Some(v.clone()) } else { None }
579+
})
580+
.collect()
581+
}
592582
}

crates/apollo_consensus/src/state_machine.rs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ pub(crate) struct StateMachine {
7878
is_observer: bool,
7979
// {round: (proposal_id, valid_round)}
8080
proposals: HashMap<Round, (Option<ProposalCommitment>, Option<Round>)>,
81-
// {round: {proposal_id: vote_count}
82-
prevotes: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
83-
precommits: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
81+
// {(round, voter): (vote, weight)}
82+
prevotes: HashMap<(Round, ValidatorId), (Vote, u32)>,
83+
precommits: HashMap<(Round, ValidatorId), (Vote, u32)>,
8484
// When true, the state machine will wait for a GetProposal event, buffering all other input
8585
// events in `events_queue`.
8686
awaiting_get_proposal: bool,
@@ -145,6 +145,14 @@ impl StateMachine {
145145
self.height
146146
}
147147

148+
pub(crate) fn prevotes_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
149+
&self.prevotes
150+
}
151+
152+
pub(crate) fn precommits_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
153+
&self.precommits
154+
}
155+
148156
fn make_self_vote(
149157
&self,
150158
vote_type: VoteType,
@@ -328,15 +336,9 @@ impl StateMachine {
328336
where
329337
LeaderFn: Fn(Round) -> ValidatorId,
330338
{
331-
let prevote_count = self
332-
.prevotes
333-
.entry(vote.round)
334-
.or_default()
335-
.entry(vote.proposal_commitment)
336-
.or_insert(0);
337-
// TODO(matan): Use variable weight.
338-
*prevote_count += 1;
339-
self.map_round_to_upons(vote.round, leader_fn)
339+
let round = vote.round;
340+
self.prevotes.entry((round, vote.voter)).or_insert((vote, 1));
341+
self.map_round_to_upons(round, leader_fn)
340342
}
341343

342344
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
@@ -360,15 +362,9 @@ impl StateMachine {
360362
where
361363
LeaderFn: Fn(Round) -> ValidatorId,
362364
{
363-
let precommit_count = self
364-
.precommits
365-
.entry(vote.round)
366-
.or_default()
367-
.entry(vote.proposal_commitment)
368-
.or_insert(0);
369-
// TODO(matan): Use variable weight.
370-
*precommit_count += 1;
371-
self.map_round_to_upons(vote.round, leader_fn)
365+
let round = vote.round;
366+
self.precommits.entry((round, vote.voter)).or_insert((vote, 1));
367+
self.map_round_to_upons(round, leader_fn)
372368
}
373369

374370
fn handle_timeout_precommit<LeaderFn>(
@@ -640,24 +636,36 @@ impl StateMachine {
640636

641637
fn round_has_enough_votes(
642638
&self,
643-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
639+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
644640
round: u32,
645641
threshold: &VotesThreshold,
646642
) -> bool {
647-
threshold
648-
.is_met(votes.get(&round).map_or(0, |v| v.values().sum()).into(), self.total_weight)
643+
let weight_sum = votes
644+
.iter()
645+
.filter_map(
646+
|(&(r, _voter), (_v, w))| if r == round { Some(u64::from(*w)) } else { None },
647+
)
648+
.sum();
649+
threshold.is_met(weight_sum, self.total_weight)
649650
}
650651

651652
fn value_has_enough_votes(
652653
&self,
653-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
654+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
654655
round: u32,
655656
value: &Option<ProposalCommitment>,
656657
threshold: &VotesThreshold,
657658
) -> bool {
658-
threshold.is_met(
659-
votes.get(&round).map_or(0, |v| *v.get(value).unwrap_or(&0)).into(),
660-
self.total_weight,
661-
)
659+
let weight_sum = votes
660+
.iter()
661+
.filter_map(|(&(r, _voter), (v, w))| {
662+
if r == round && &v.proposal_commitment == value {
663+
Some(u64::from(*w))
664+
} else {
665+
None
666+
}
667+
})
668+
.sum();
669+
threshold.is_met(weight_sum, self.total_weight)
662670
}
663671
}

crates/apollo_consensus/src/state_machine_test.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::votes_threshold::QuorumType;
1414
lazy_static! {
1515
static ref PROPOSER_ID: ValidatorId = DEFAULT_VALIDATOR_ID.into();
1616
static ref VALIDATOR_ID: ValidatorId = (DEFAULT_VALIDATOR_ID + 1).into();
17+
static ref VALIDATOR_ID_2: ValidatorId = (DEFAULT_VALIDATOR_ID + 2).into();
18+
static ref VALIDATOR_ID_3: ValidatorId = (DEFAULT_VALIDATOR_ID + 3).into();
1719
}
1820

1921
const PROPOSAL_ID: Option<ProposalCommitment> = Some(ProposalCommitment(Felt::ONE));
@@ -33,6 +35,8 @@ struct TestWrapper<LeaderFn: Fn(Round) -> ValidatorId> {
3335
state_machine: StateMachine,
3436
leader_fn: LeaderFn,
3537
events: VecDeque<StateMachineEvent>,
38+
peer_voters: Vec<ValidatorId>,
39+
next_peer_idx: usize,
3640
}
3741

3842
impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
@@ -43,13 +47,27 @@ impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
4347
is_observer: bool,
4448
quorum_type: QuorumType,
4549
) -> Self {
50+
let mut peer_voters = vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]
51+
.into_iter()
52+
.filter(|v| *v != id)
53+
.collect::<Vec<_>>();
54+
// Ensure deterministic order.
55+
peer_voters.sort();
4656
Self {
4757
state_machine: StateMachine::new(HEIGHT, id, total_weight, is_observer, quorum_type),
4858
leader_fn,
4959
events: VecDeque::new(),
60+
peer_voters,
61+
next_peer_idx: 0,
5062
}
5163
}
5264

65+
fn next_peer(&mut self) -> ValidatorId {
66+
let voter = self.peer_voters[self.next_peer_idx % self.peer_voters.len()];
67+
self.next_peer_idx += 1;
68+
voter
69+
}
70+
5371
pub fn next_event(&mut self) -> Option<StateMachineEvent> {
5472
self.events.pop_front()
5573
}
@@ -67,20 +85,22 @@ impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
6785
}
6886

6987
pub fn send_prevote(&mut self, proposal_id: Option<ProposalCommitment>, round: Round) {
88+
let voter = self.next_peer();
7089
self.send_event(StateMachineEvent::Prevote(mk_vote(
7190
VoteType::Prevote,
7291
round,
7392
proposal_id,
74-
*PROPOSER_ID,
93+
voter,
7594
)))
7695
}
7796

7897
pub fn send_precommit(&mut self, proposal_id: Option<ProposalCommitment>, round: Round) {
98+
let voter = self.next_peer();
7999
self.send_event(StateMachineEvent::Precommit(mk_vote(
80100
VoteType::Precommit,
81101
round,
82102
proposal_id,
83-
*PROPOSER_ID,
103+
voter,
84104
)))
85105
}
86106

0 commit comments

Comments
 (0)