Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 17 additions & 22 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::metrics::{
CONSENSUS_REPROPOSALS,
};
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
use crate::state_machine::{SMRequest, StateMachineEvent};
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
use crate::storage::HeightVotedStorageTrait;
use crate::types::{
BroadcastVoteChannel,
Expand Down Expand Up @@ -785,29 +785,24 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
.boxed();
Ok(Some(fut))
}
SMRequest::ScheduleTimeoutPropose(round) => {
let duration = timeouts.get_proposal_timeout(round);
let fut = async move {
tokio::time::sleep(duration).await;
StateMachineEvent::TimeoutPropose(round)
}
.boxed();
Ok(Some(fut))
}
SMRequest::ScheduleTimeoutPrevote(round) => {
let duration = timeouts.get_prevote_timeout(round);
let fut = async move {
tokio::time::sleep(duration).await;
StateMachineEvent::TimeoutPrevote(round)
}
.boxed();
Ok(Some(fut))
}
SMRequest::ScheduleTimeoutPrecommit(round) => {
let duration = timeouts.get_precommit_timeout(round);
SMRequest::ScheduleTimeout(step, round) => {
let (duration, event) = match step {
Step::Propose => (
timeouts.get_proposal_timeout(round),
StateMachineEvent::TimeoutPropose(round),
),
Step::Prevote => (
timeouts.get_prevote_timeout(round),
StateMachineEvent::TimeoutPrevote(round),
),
Step::Precommit => (
timeouts.get_precommit_timeout(round),
StateMachineEvent::TimeoutPrecommit(round),
),
};
let fut = async move {
tokio::time::sleep(duration).await;
StateMachineEvent::TimeoutPrecommit(round)
event
}
.boxed();
Ok(Some(fut))
Expand Down
18 changes: 9 additions & 9 deletions crates/apollo_consensus/src/single_height_consensus_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use test_case::test_case;

use super::SingleHeightConsensus;
use crate::single_height_consensus::ShcReturn;
use crate::state_machine::{SMRequest, StateMachineEvent};
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
use crate::test_utils::{precommit, prevote, TestBlock};
use crate::types::{ProposalCommitment, ValidatorId};
use crate::votes_threshold::QuorumType;
Expand Down Expand Up @@ -63,7 +63,7 @@ fn proposer() {
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
// Expect a precommit broadcast request present.
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
assert!(reqs.is_empty());
});
Expand Down Expand Up @@ -125,7 +125,7 @@ fn validator(repeat_proposal: bool) {
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_3)).unwrap();
// Expect a precommit broadcast request present.
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
assert!(reqs.is_empty());
});
Expand Down Expand Up @@ -169,7 +169,7 @@ fn vote_twice(same_vote: bool) {
.unwrap();
// On quorum of prevotes, expect a precommit broadcast request.
assert_matches!(res, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
assert!(reqs.is_empty());
});
Expand Down Expand Up @@ -227,7 +227,7 @@ fn rebroadcast_votes() {
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
// Expect a precommit broadcast at round 0.
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0);
assert!(reqs.is_empty());
});
Expand All @@ -250,7 +250,7 @@ fn rebroadcast_votes() {
let ret =
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3)).unwrap();
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(1)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 1)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 1);
assert!(reqs.is_empty());
});
Expand Down Expand Up @@ -303,9 +303,9 @@ fn repropose() {
// and schedule a prevote timeout for round 0.
let ret =
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
// Expect ScheduleTimeoutPrevote{round:0} and BroadcastVote(Precommit).
// Expect ScheduleTimeout(Step::Prevote, 0) and BroadcastVote(Precommit).
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0);
assert!(reqs.is_empty());
});
Expand All @@ -315,7 +315,7 @@ fn repropose() {
let ret = shc.handle_vote(&leader_fn, precommit(None, 0, 0, *VALIDATOR_ID_2)).unwrap();
// assert that ret is ScheduleTimeoutPrecommit
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrecommit(0)));
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Precommit, 0)));
assert!(reqs.is_empty());
});
// No precommit quorum is reached. On TimeoutPrecommit(0) the proposer advances to round 1 with
Expand Down
14 changes: 5 additions & 9 deletions crates/apollo_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ pub(crate) enum SMRequest {
StartValidateProposal(ProposalInit),
/// Request to broadcast a Prevote or Precommit vote.
BroadcastVote(Vote),
/// Request to schedule a TimeoutPropose.
ScheduleTimeoutPropose(Round),
/// Request to schedule a TimeoutPrevote.
ScheduleTimeoutPrevote(Round),
/// Request to schedule a TimeoutPrecommit.
ScheduleTimeoutPrecommit(Round),
/// Request to schedule a timeout for a specific step and round.
ScheduleTimeout(Step, Round),
/// Decision reached for the given proposal and round.
DecisionReached(ProposalCommitment, Round),
/// Request to re-propose (sent by the leader after advancing to a new round
Expand Down Expand Up @@ -511,7 +507,7 @@ impl StateMachine {
}
} else {
info!("START_ROUND_VALIDATOR: Starting round {round} as Validator");
VecDeque::from([SMRequest::ScheduleTimeoutPropose(self.round)])
VecDeque::from([SMRequest::ScheduleTimeout(Step::Propose, self.round)])
};
output.append(&mut self.current_round_upons());
output
Expand Down Expand Up @@ -625,7 +621,7 @@ impl StateMachine {
if !self.mixed_prevote_quorum.insert(self.round) {
return VecDeque::new();
}
VecDeque::from([SMRequest::ScheduleTimeoutPrevote(self.round)])
VecDeque::from([SMRequest::ScheduleTimeout(Step::Prevote, self.round)])
}

// LOC 36 in the paper.
Expand Down Expand Up @@ -684,7 +680,7 @@ impl StateMachine {
if !self.mixed_precommit_quorum.insert(self.round) {
return VecDeque::new();
}
VecDeque::from([SMRequest::ScheduleTimeoutPrecommit(self.round)])
VecDeque::from([SMRequest::ScheduleTimeout(Step::Precommit, self.round)])
}

// LOC 49 in the paper.
Expand Down
Loading
Loading