Skip to content

Commit aa597f5

Browse files
apollo_consensus: move vote storage to SM; dedupe in SHC
1 parent 5f08fb8 commit aa597f5

File tree

3 files changed

+99
-79
lines changed

3 files changed

+99
-79
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ pub(crate) struct SingleHeightConsensus {
137137
timeouts: TimeoutsConfig,
138138
state_machine: StateMachine,
139139
proposals: HashMap<Round, Option<ProposalCommitment>>,
140-
prevotes: HashMap<(Round, ValidatorId), Vote>,
141-
precommits: HashMap<(Round, ValidatorId), Vote>,
142140
last_prevote: Option<Vote>,
143141
last_precommit: Option<Vote>,
144142
}
@@ -161,8 +159,6 @@ impl SingleHeightConsensus {
161159
timeouts,
162160
state_machine,
163161
proposals: HashMap::new(),
164-
prevotes: HashMap::new(),
165-
precommits: HashMap::new(),
166162
last_prevote: None,
167163
last_precommit: None,
168164
}
@@ -349,29 +345,27 @@ impl SingleHeightConsensus {
349345
return Ok(ShcReturn::Tasks(Vec::new()));
350346
}
351347

352-
let (votes, sm_vote) = match vote.vote_type {
353-
VoteType::Prevote => (&mut self.prevotes, StateMachineEvent::Prevote(vote.clone())),
348+
// Check duplicates/conflicts from SM stored votes.
349+
let (votes_map, sm_vote) = match vote.vote_type {
350+
VoteType::Prevote => {
351+
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
352+
}
354353
VoteType::Precommit => {
355-
(&mut self.precommits, StateMachineEvent::Precommit(vote.clone()))
354+
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
356355
}
357356
};
358-
359-
match votes.entry((vote.round, vote.voter)) {
360-
Entry::Vacant(entry) => {
361-
entry.insert(vote.clone());
362-
}
363-
Entry::Occupied(entry) => {
364-
let old = entry.get();
365-
if old.proposal_commitment != vote.proposal_commitment {
366-
warn!("Conflicting votes: old={:?}, new={:?}", old, vote);
367-
CONSENSUS_CONFLICTING_VOTES.increment(1);
368-
return Ok(ShcReturn::Tasks(Vec::new()));
369-
} else {
370-
// Replay, ignore.
371-
return Ok(ShcReturn::Tasks(Vec::new()));
372-
}
357+
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
358+
if old_vote.proposal_commitment == vote.proposal_commitment {
359+
// Duplicate - ignore.
360+
return Ok(ShcReturn::Tasks(Vec::new()));
361+
} else {
362+
// Conflict - ignore and record.
363+
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
364+
CONSENSUS_CONFLICTING_VOTES.increment(1);
365+
return Ok(ShcReturn::Tasks(Vec::new()));
373366
}
374367
}
368+
375369
info!("Accepting {:?}", vote);
376370
let height = self.state_machine.height();
377371
let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) };
@@ -502,29 +496,24 @@ impl SingleHeightConsensus {
502496
context: &mut ContextT,
503497
vote: Vote,
504498
) -> Result<Vec<ShcTask>, ConsensusError> {
505-
let prevote_timeout = self.timeouts.get_prevote_timeout(vote.round);
506-
let precommit_timeout = self.timeouts.get_precommit_timeout(vote.round);
507-
let (votes, last_vote, task) = match vote.vote_type {
499+
let (last_vote, task) = match vote.vote_type {
508500
VoteType::Prevote => (
509-
&mut self.prevotes,
510501
&mut self.last_prevote,
511-
ShcTask::Prevote(prevote_timeout, StateMachineEvent::Prevote(vote.clone())),
502+
ShcTask::Prevote(
503+
self.timeouts.get_prevote_timeout(vote.round),
504+
StateMachineEvent::Prevote(vote.clone()),
505+
),
512506
),
513507
VoteType::Precommit => (
514-
&mut self.precommits,
515508
&mut self.last_precommit,
516-
ShcTask::Precommit(precommit_timeout, StateMachineEvent::Precommit(vote.clone())),
509+
ShcTask::Precommit(
510+
self.timeouts.get_precommit_timeout(vote.round),
511+
StateMachineEvent::Precommit(vote.clone()),
512+
),
517513
),
518514
};
519515
// Ensure the voter matches this node.
520516
assert_eq!(vote.voter, self.state_machine.validator_id());
521-
if let Some(old) =
522-
votes.insert((vote.round, self.state_machine.validator_id()), vote.clone())
523-
{
524-
return Err(ConsensusError::InternalInconsistency(format!(
525-
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
526-
)));
527-
}
528517
*last_vote = match last_vote {
529518
None => Some(vote.clone()),
530519
Some(last_vote) if vote.round > last_vote.round => Some(vote.clone()),
@@ -569,18 +558,7 @@ impl SingleHeightConsensus {
569558
{block}"
570559
)));
571560
}
572-
let supporting_precommits: Vec<Vote> = self
573-
.validators
574-
.iter()
575-
.filter_map(|v| {
576-
let vote = self.precommits.get(&(round, *v))?;
577-
if vote.proposal_commitment == Some(proposal_id) {
578-
Some(vote.clone())
579-
} else {
580-
None
581-
}
582-
})
583-
.collect();
561+
let supporting_precommits = self.precommit_votes_for_value(round, Some(proposal_id));
584562

585563
// TODO(matan): Check actual weights.
586564
let vote_weight = u64::try_from(supporting_precommits.len())
@@ -596,4 +574,18 @@ impl SingleHeightConsensus {
596574
}
597575
Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block }))
598576
}
577+
578+
fn precommit_votes_for_value(
579+
&self,
580+
round: Round,
581+
value: Option<ProposalCommitment>,
582+
) -> Vec<Vote> {
583+
self.state_machine
584+
.precommits_ref()
585+
.iter()
586+
.filter_map(|(&(r, _voter), (v, _w))| {
587+
if r == round && v.proposal_commitment == value { Some(v.clone()) } else { None }
588+
})
589+
.collect()
590+
}
599591
}

crates/apollo_consensus/src/state_machine.rs

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,9 @@ pub(crate) struct StateMachine {
7878
is_observer: bool,
7979
// {round: (proposal_id, valid_round)}
8080
proposals: HashMap<Round, (Option<ProposalCommitment>, Option<Round>)>,
81-
// {round: {proposal_id: vote_count}
82-
prevotes: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
83-
precommits: HashMap<Round, HashMap<Option<ProposalCommitment>, u32>>,
81+
// {(round, voter): (vote, weight)}
82+
prevotes: HashMap<(Round, ValidatorId), (Vote, u32)>,
83+
precommits: HashMap<(Round, ValidatorId), (Vote, u32)>,
8484
// When true, the state machine will wait for a GetProposal event, buffering all other input
8585
// events in `events_queue`.
8686
awaiting_get_proposal: bool,
@@ -145,6 +145,14 @@ impl StateMachine {
145145
self.height
146146
}
147147

148+
pub(crate) fn prevotes_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
149+
&self.prevotes
150+
}
151+
152+
pub(crate) fn precommits_ref(&self) -> &HashMap<(Round, ValidatorId), (Vote, u32)> {
153+
&self.precommits
154+
}
155+
148156
fn make_self_vote(
149157
&self,
150158
vote_type: VoteType,
@@ -328,15 +336,9 @@ impl StateMachine {
328336
where
329337
LeaderFn: Fn(Round) -> ValidatorId,
330338
{
331-
let prevote_count = self
332-
.prevotes
333-
.entry(vote.round)
334-
.or_default()
335-
.entry(vote.proposal_commitment)
336-
.or_insert(0);
337-
// TODO(matan): Use variable weight.
338-
*prevote_count += 1;
339-
self.map_round_to_upons(vote.round, leader_fn)
339+
let round = vote.round;
340+
self.prevotes.entry((round, vote.voter)).or_insert((vote, 1));
341+
self.map_round_to_upons(round, leader_fn)
340342
}
341343

342344
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
@@ -360,15 +362,9 @@ impl StateMachine {
360362
where
361363
LeaderFn: Fn(Round) -> ValidatorId,
362364
{
363-
let precommit_count = self
364-
.precommits
365-
.entry(vote.round)
366-
.or_default()
367-
.entry(vote.proposal_commitment)
368-
.or_insert(0);
369-
// TODO(matan): Use variable weight.
370-
*precommit_count += 1;
371-
self.map_round_to_upons(vote.round, leader_fn)
365+
let round = vote.round;
366+
self.precommits.entry((round, vote.voter)).or_insert((vote, 1));
367+
self.map_round_to_upons(round, leader_fn)
372368
}
373369

374370
fn handle_timeout_precommit<LeaderFn>(
@@ -640,24 +636,36 @@ impl StateMachine {
640636

641637
fn round_has_enough_votes(
642638
&self,
643-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
639+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
644640
round: u32,
645641
threshold: &VotesThreshold,
646642
) -> bool {
647-
threshold
648-
.is_met(votes.get(&round).map_or(0, |v| v.values().sum()).into(), self.total_weight)
643+
let weight_sum = votes
644+
.iter()
645+
.filter_map(
646+
|(&(r, _voter), (_v, w))| if r == round { Some(u64::from(*w)) } else { None },
647+
)
648+
.sum();
649+
threshold.is_met(weight_sum, self.total_weight)
649650
}
650651

651652
fn value_has_enough_votes(
652653
&self,
653-
votes: &HashMap<u32, HashMap<Option<ProposalCommitment>, u32>>,
654+
votes: &HashMap<(Round, ValidatorId), (Vote, u32)>,
654655
round: u32,
655656
value: &Option<ProposalCommitment>,
656657
threshold: &VotesThreshold,
657658
) -> bool {
658-
threshold.is_met(
659-
votes.get(&round).map_or(0, |v| *v.get(value).unwrap_or(&0)).into(),
660-
self.total_weight,
661-
)
659+
let weight_sum = votes
660+
.iter()
661+
.filter_map(|(&(r, _voter), (v, w))| {
662+
if r == round && &v.proposal_commitment == value {
663+
Some(u64::from(*w))
664+
} else {
665+
None
666+
}
667+
})
668+
.sum();
669+
threshold.is_met(weight_sum, self.total_weight)
662670
}
663671
}

crates/apollo_consensus/src/state_machine_test.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use crate::votes_threshold::QuorumType;
1414
lazy_static! {
1515
static ref PROPOSER_ID: ValidatorId = DEFAULT_VALIDATOR_ID.into();
1616
static ref VALIDATOR_ID: ValidatorId = (DEFAULT_VALIDATOR_ID + 1).into();
17+
static ref VALIDATOR_ID_2: ValidatorId = (DEFAULT_VALIDATOR_ID + 2).into();
18+
static ref VALIDATOR_ID_3: ValidatorId = (DEFAULT_VALIDATOR_ID + 3).into();
1719
}
1820

1921
const PROPOSAL_ID: Option<ProposalCommitment> = Some(ProposalCommitment(Felt::ONE));
@@ -33,6 +35,8 @@ struct TestWrapper<LeaderFn: Fn(Round) -> ValidatorId> {
3335
state_machine: StateMachine,
3436
leader_fn: LeaderFn,
3537
events: VecDeque<StateMachineEvent>,
38+
peer_voters: Vec<ValidatorId>,
39+
next_peer_idx: usize,
3640
}
3741

3842
impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
@@ -43,13 +47,27 @@ impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
4347
is_observer: bool,
4448
quorum_type: QuorumType,
4549
) -> Self {
50+
let mut peer_voters = vec![*PROPOSER_ID, *VALIDATOR_ID, *VALIDATOR_ID_2, *VALIDATOR_ID_3]
51+
.into_iter()
52+
.filter(|v| *v != id)
53+
.collect::<Vec<_>>();
54+
// Ensure deterministic order.
55+
peer_voters.sort();
4656
Self {
4757
state_machine: StateMachine::new(HEIGHT, id, total_weight, is_observer, quorum_type),
4858
leader_fn,
4959
events: VecDeque::new(),
60+
peer_voters,
61+
next_peer_idx: 0,
5062
}
5163
}
5264

65+
fn next_peer(&mut self) -> ValidatorId {
66+
let voter = self.peer_voters[self.next_peer_idx % self.peer_voters.len()];
67+
self.next_peer_idx += 1;
68+
voter
69+
}
70+
5371
pub fn next_event(&mut self) -> Option<StateMachineEvent> {
5472
self.events.pop_front()
5573
}
@@ -67,20 +85,22 @@ impl<LeaderFn: Fn(Round) -> ValidatorId> TestWrapper<LeaderFn> {
6785
}
6886

6987
pub fn send_prevote(&mut self, proposal_id: Option<ProposalCommitment>, round: Round) {
88+
let voter = self.next_peer();
7089
self.send_event(StateMachineEvent::Prevote(mk_vote(
7190
VoteType::Prevote,
7291
round,
7392
proposal_id,
74-
*PROPOSER_ID,
93+
voter,
7594
)))
7695
}
7796

7897
pub fn send_precommit(&mut self, proposal_id: Option<ProposalCommitment>, round: Round) {
98+
let voter = self.next_peer();
7999
self.send_event(StateMachineEvent::Precommit(mk_vote(
80100
VoteType::Precommit,
81101
round,
82102
proposal_id,
83-
*PROPOSER_ID,
103+
voter,
84104
)))
85105
}
86106

0 commit comments

Comments
 (0)