Skip to content

Commit 9bb3703

Browse files
apollo_consensus: move vote storage to SM; dedupe in SHC
1 parent f046009 commit 9bb3703

File tree

4 files changed

+178
-90
lines changed

4 files changed

+178
-90
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,6 @@ pub(crate) struct SingleHeightConsensus {
139139
timeouts: TimeoutsConfig,
140140
state_machine: StateMachine,
141141
proposals: HashMap<Round, Option<ProposalCommitment>>,
142-
prevotes: HashMap<(Round, ValidatorId), Vote>,
143-
precommits: HashMap<(Round, ValidatorId), Vote>,
144142
last_prevote: Option<Vote>,
145143
last_precommit: Option<Vote>,
146144
height_voted_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
@@ -165,8 +163,6 @@ impl SingleHeightConsensus {
165163
timeouts,
166164
state_machine,
167165
proposals: HashMap::new(),
168-
prevotes: HashMap::new(),
169-
precommits: HashMap::new(),
170166
last_prevote: None,
171167
last_precommit: None,
172168
height_voted_storage,
@@ -259,7 +255,7 @@ impl SingleHeightConsensus {
259255
context.broadcast(last_vote.clone()).await?;
260256
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
261257
self.timeouts.get_prevote_timeout(0),
262-
StateMachineEvent::Prevote(last_vote.clone()),
258+
StateMachineEvent::Prevote(vote),
263259
)]))
264260
}
265261
StateMachineEvent::Precommit(vote) => {
@@ -276,7 +272,7 @@ impl SingleHeightConsensus {
276272
context.broadcast(last_vote.clone()).await?;
277273
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
278274
self.timeouts.get_precommit_timeout(0),
279-
StateMachineEvent::Precommit(last_vote.clone()),
275+
StateMachineEvent::Precommit(vote),
280276
)]))
281277
}
282278
StateMachineEvent::Proposal(proposal_id, round, valid_round) => {
@@ -354,29 +350,27 @@ impl SingleHeightConsensus {
354350
return Ok(ShcReturn::Tasks(Vec::new()));
355351
}
356352

357-
let (votes, sm_vote) = match vote.vote_type {
358-
VoteType::Prevote => (&mut self.prevotes, StateMachineEvent::Prevote(vote.clone())),
353+
// Check duplicates/conflicts from SM stored votes.
354+
let (votes_map, sm_vote) = match vote.vote_type {
355+
VoteType::Prevote => {
356+
(self.state_machine.prevotes_ref(), StateMachineEvent::Prevote(vote.clone()))
357+
}
359358
VoteType::Precommit => {
360-
(&mut self.precommits, StateMachineEvent::Precommit(vote.clone()))
359+
(self.state_machine.precommits_ref(), StateMachineEvent::Precommit(vote.clone()))
361360
}
362361
};
363-
364-
match votes.entry((vote.round, vote.voter)) {
365-
Entry::Vacant(entry) => {
366-
entry.insert(vote.clone());
367-
}
368-
Entry::Occupied(entry) => {
369-
let old = entry.get();
370-
if old.proposal_commitment != vote.proposal_commitment {
371-
warn!("Conflicting votes: old={:?}, new={:?}", old, vote);
372-
CONSENSUS_CONFLICTING_VOTES.increment(1);
373-
return Ok(ShcReturn::Tasks(Vec::new()));
374-
} else {
375-
// Replay, ignore.
376-
return Ok(ShcReturn::Tasks(Vec::new()));
377-
}
362+
if let Some((old_vote, _)) = votes_map.get(&(vote.round, vote.voter)) {
363+
if old_vote.proposal_commitment == vote.proposal_commitment {
364+
// Duplicate - ignore.
365+
return Ok(ShcReturn::Tasks(Vec::new()));
366+
} else {
367+
// Conflict - ignore and record.
368+
warn!("Conflicting votes: old={old_vote:?}, new={vote:?}");
369+
CONSENSUS_CONFLICTING_VOTES.increment(1);
370+
return Ok(ShcReturn::Tasks(Vec::new()));
378371
}
379372
}
373+
380374
info!("Accepting {:?}", vote);
381375
let height = self.state_machine.height();
382376
let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) };
@@ -507,29 +501,24 @@ impl SingleHeightConsensus {
507501
context: &mut ContextT,
508502
vote: Vote,
509503
) -> Result<Vec<ShcTask>, ConsensusError> {
510-
let prevote_timeout = self.timeouts.get_prevote_timeout(vote.round);
511-
let precommit_timeout = self.timeouts.get_precommit_timeout(vote.round);
512-
let (votes, last_vote, task) = match vote.vote_type {
504+
let (last_vote, task) = match vote.vote_type {
513505
VoteType::Prevote => (
514-
&mut self.prevotes,
515506
&mut self.last_prevote,
516-
ShcTask::Prevote(prevote_timeout, StateMachineEvent::Prevote(vote.clone())),
507+
ShcTask::Prevote(
508+
self.timeouts.get_prevote_timeout(0),
509+
StateMachineEvent::Prevote(vote.clone()),
510+
),
517511
),
518512
VoteType::Precommit => (
519-
&mut self.precommits,
520513
&mut self.last_precommit,
521-
ShcTask::Precommit(precommit_timeout, StateMachineEvent::Precommit(vote.clone())),
514+
ShcTask::Precommit(
515+
self.timeouts.get_precommit_timeout(0),
516+
StateMachineEvent::Precommit(vote.clone()),
517+
),
522518
),
523519
};
524520
// Ensure the voter matches this node.
525521
assert_eq!(vote.voter, self.state_machine.validator_id());
526-
if let Some(old) =
527-
votes.insert((vote.round, self.state_machine.validator_id()), vote.clone())
528-
{
529-
return Err(ConsensusError::InternalInconsistency(format!(
530-
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
531-
)));
532-
}
533522
*last_vote = match last_vote {
534523
None => Some(vote.clone()),
535524
Some(last_vote) if vote.round > last_vote.round => Some(vote.clone()),
@@ -583,18 +572,7 @@ impl SingleHeightConsensus {
583572
{block}"
584573
)));
585574
}
586-
let supporting_precommits: Vec<Vote> = self
587-
.validators
588-
.iter()
589-
.filter_map(|v| {
590-
let vote = self.precommits.get(&(round, *v))?;
591-
if vote.proposal_commitment == Some(proposal_id) {
592-
Some(vote.clone())
593-
} else {
594-
None
595-
}
596-
})
597-
.collect();
575+
let supporting_precommits = self.precommit_votes_for_value(round, Some(proposal_id));
598576

599577
// TODO(matan): Check actual weights.
600578
let vote_weight = u64::try_from(supporting_precommits.len())
@@ -610,4 +588,18 @@ impl SingleHeightConsensus {
610588
}
611589
Ok(ShcReturn::Decision(Decision { precommits: supporting_precommits, block }))
612590
}
591+
592+
fn precommit_votes_for_value(
593+
&self,
594+
round: Round,
595+
value: Option<ProposalCommitment>,
596+
) -> Vec<Vote> {
597+
self.state_machine
598+
.precommits_ref()
599+
.iter()
600+
.filter_map(|(&(r, _voter), (v, _w))| {
601+
if r == round && v.proposal_commitment == value { Some(v.clone()) } else { None }
602+
})
603+
.collect()
604+
}
613605
}

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 54 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit {
5454
}
5555

5656
fn prevote_task(block_felt: Option<Felt>, height: u64, round: u32, voter: ValidatorId) -> ShcTask {
57-
let duration = TIMEOUTS.get_prevote_timeout(round);
57+
let duration = TIMEOUTS.get_prevote_timeout(0);
5858
ShcTask::Prevote(
5959
duration,
6060
StateMachineEvent::Prevote(prevote(block_felt, height, round, voter)),
@@ -67,7 +67,7 @@ fn precommit_task(
6767
round: u32,
6868
voter: ValidatorId,
6969
) -> ShcTask {
70-
let duration = TIMEOUTS.get_precommit_timeout(round);
70+
let duration = TIMEOUTS.get_precommit_timeout(0);
7171
ShcTask::Precommit(
7272
duration,
7373
StateMachineEvent::Precommit(precommit(block_felt, height, round, voter)),
@@ -357,7 +357,7 @@ async fn rebroadcast_votes() {
357357
Arc::new(Mutex::new(NoOpHeightVotedStorage)),
358358
);
359359

360-
context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
360+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
361361
context.expect_build_proposal().times(1).returning(move |_, _| {
362362
let (block_sender, block_receiver) = oneshot::channel();
363363
block_sender.send(BLOCK.id).unwrap();
@@ -383,27 +383,72 @@ async fn rebroadcast_votes() {
383383
// 3 of 4 Prevotes is enough to send a Precommit.
384384
context
385385
.expect_broadcast()
386-
.times(2) // vote rebroadcast
387-
.withf(move |msg: &Vote| {
388-
msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)
389-
})
386+
.times(1) // just the first precommit at round 0.
387+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
390388
.returning(move |_| Ok(()));
391389
// The Node got a Prevote quorum.
392390
assert_eq!(
393391
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await,
394392
Ok(ShcReturn::Tasks(vec![
395393
timeout_prevote_task(0),
394+
// initiate timeout for rebroadcast at round 0.
396395
precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID),
397396
]))
398397
);
399-
// Re-broadcast vote.
398+
// Advance to the next round with NIL precommits.
399+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_1)).await.unwrap();
400+
shc.handle_vote(&mut context, precommit(None, 0, 0, *VALIDATOR_ID_2)).await.unwrap();
401+
context.expect_repropose().returning(move |id, init| {
402+
assert_eq!(init.height, BlockNumber(0));
403+
assert_eq!(id, BLOCK.id);
404+
assert_eq!(init.round, 1);
405+
});
406+
context
407+
.expect_broadcast()
408+
.times(1)
409+
.withf(move |msg: &Vote| msg == &prevote(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
410+
.returning(move |_| Ok(()));
411+
shc.handle_event(&mut context, StateMachineEvent::TimeoutPrecommit(0)).await.unwrap();
412+
context
413+
.expect_broadcast()
414+
.times(2) // first: initial precommit vote at r1, second: rebroadcast at r1
415+
.withf(move |msg: &Vote| msg == &precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
416+
.returning(move |_| Ok(()));
417+
assert_eq!(
418+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_2)).await,
419+
Ok(ShcReturn::Tasks(Vec::new()))
420+
);
421+
assert_eq!(
422+
shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3)).await,
423+
Ok(ShcReturn::Tasks(vec![
424+
timeout_prevote_task(1),
425+
// initiate timeout for rebroadcast at round 1.
426+
precommit_task(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID),
427+
]))
428+
);
429+
430+
// Re-broadcast with older vote (round 0) - should be ignored (no broadcast, no task).
400431
assert_eq!(
401432
shc.handle_event(
402433
&mut context,
403434
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID))
404435
)
405436
.await,
406-
Ok(ShcReturn::Tasks(vec![precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID),]))
437+
Ok(ShcReturn::Tasks(Vec::new()))
438+
);
439+
440+
// Re-broadcast with current round (round 1) - should broadcast and schedule another timeout for
441+
// rebroadcast at round 1.
442+
assert_eq!(
443+
shc.handle_event(
444+
&mut context,
445+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID))
446+
)
447+
.await,
448+
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
449+
TIMEOUTS.get_precommit_timeout(0),
450+
StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)),
451+
)]))
407452
);
408453
}
409454

0 commit comments

Comments
 (0)