Skip to content

Commit 9b240b3

Browse files
apollo_consensus: unify timeout scheduling requests into single enum variant
1 parent a2a3a88 commit 9b240b3

File tree

4 files changed

+86
-77
lines changed

4 files changed

+86
-77
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::metrics::{
3939
CONSENSUS_REPROPOSALS,
4040
};
4141
use crate::single_height_consensus::{ShcReturn, SingleHeightConsensus};
42-
use crate::state_machine::{SMRequest, StateMachineEvent};
42+
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
4343
use crate::storage::HeightVotedStorageTrait;
4444
use crate::types::{
4545
BroadcastVoteChannel,
@@ -785,29 +785,24 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
785785
.boxed();
786786
Ok(Some(fut))
787787
}
788-
SMRequest::ScheduleTimeoutPropose(round) => {
789-
let duration = timeouts.get_proposal_timeout(round);
790-
let fut = async move {
791-
tokio::time::sleep(duration).await;
792-
StateMachineEvent::TimeoutPropose(round)
793-
}
794-
.boxed();
795-
Ok(Some(fut))
796-
}
797-
SMRequest::ScheduleTimeoutPrevote(round) => {
798-
let duration = timeouts.get_prevote_timeout(round);
799-
let fut = async move {
800-
tokio::time::sleep(duration).await;
801-
StateMachineEvent::TimeoutPrevote(round)
802-
}
803-
.boxed();
804-
Ok(Some(fut))
805-
}
806-
SMRequest::ScheduleTimeoutPrecommit(round) => {
807-
let duration = timeouts.get_precommit_timeout(round);
788+
SMRequest::ScheduleTimeout(step, round) => {
789+
let (duration, event) = match step {
790+
Step::Propose => (
791+
timeouts.get_proposal_timeout(round),
792+
StateMachineEvent::TimeoutPropose(round),
793+
),
794+
Step::Prevote => (
795+
timeouts.get_prevote_timeout(round),
796+
StateMachineEvent::TimeoutPrevote(round),
797+
),
798+
Step::Precommit => (
799+
timeouts.get_precommit_timeout(round),
800+
StateMachineEvent::TimeoutPrecommit(round),
801+
),
802+
};
808803
let fut = async move {
809804
tokio::time::sleep(duration).await;
810-
StateMachineEvent::TimeoutPrecommit(round)
805+
event
811806
}
812807
.boxed();
813808
Ok(Some(fut))

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use test_case::test_case;
88

99
use super::SingleHeightConsensus;
1010
use crate::single_height_consensus::ShcReturn;
11-
use crate::state_machine::{SMRequest, StateMachineEvent};
11+
use crate::state_machine::{SMRequest, StateMachineEvent, Step};
1212
use crate::test_utils::{precommit, prevote, TestBlock};
1313
use crate::types::{ProposalCommitment, ValidatorId};
1414
use crate::votes_threshold::QuorumType;
@@ -63,7 +63,7 @@ fn proposer() {
6363
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
6464
// Expect a precommit broadcast request present.
6565
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
66-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
66+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
6767
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
6868
assert!(reqs.is_empty());
6969
});
@@ -125,7 +125,7 @@ fn validator(repeat_proposal: bool) {
125125
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_3)).unwrap();
126126
// Expect a precommit broadcast request present.
127127
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
128-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
128+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
129129
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
130130
assert!(reqs.is_empty());
131131
});
@@ -169,7 +169,7 @@ fn vote_twice(same_vote: bool) {
169169
.unwrap();
170170
// On quorum of prevotes, expect a precommit broadcast request.
171171
assert_matches!(res, ShcReturn::Requests(mut reqs) => {
172-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
172+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
173173
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit);
174174
assert!(reqs.is_empty());
175175
});
@@ -227,7 +227,7 @@ fn rebroadcast_votes() {
227227
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
228228
// Expect a precommit broadcast at round 0.
229229
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
230-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
230+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
231231
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0);
232232
assert!(reqs.is_empty());
233233
});
@@ -250,7 +250,7 @@ fn rebroadcast_votes() {
250250
let ret =
251251
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 1, *VALIDATOR_ID_3)).unwrap();
252252
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
253-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(1)));
253+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 1)));
254254
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 1);
255255
assert!(reqs.is_empty());
256256
});
@@ -303,9 +303,9 @@ fn repropose() {
303303
// and schedule a prevote timeout for round 0.
304304
let ret =
305305
shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).unwrap();
306-
// Expect ScheduleTimeoutPrevote{round:0} and BroadcastVote(Precommit).
306+
// Expect ScheduleTimeout(Step::Prevote, 0) and BroadcastVote(Precommit).
307307
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
308-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrevote(0)));
308+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0)));
309309
assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0);
310310
assert!(reqs.is_empty());
311311
});
@@ -315,7 +315,7 @@ fn repropose() {
315315
let ret = shc.handle_vote(&leader_fn, precommit(None, 0, 0, *VALIDATOR_ID_2)).unwrap();
316316
// assert that ret is ScheduleTimeoutPrecommit
317317
assert_matches!(ret, ShcReturn::Requests(mut reqs) => {
318-
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeoutPrecommit(0)));
318+
assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Precommit, 0)));
319319
assert!(reqs.is_empty());
320320
});
321321
// No precommit quorum is reached. On TimeoutPrecommit(0) the proposer advances to round 1 with

crates/apollo_consensus/src/state_machine.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,8 @@ pub(crate) enum SMRequest {
6767
StartValidateProposal(ProposalInit),
6868
/// Request to broadcast a Prevote or Precommit vote.
6969
BroadcastVote(Vote),
70-
/// Request to schedule a TimeoutPropose.
71-
ScheduleTimeoutPropose(Round),
72-
/// Request to schedule a TimeoutPrevote.
73-
ScheduleTimeoutPrevote(Round),
74-
/// Request to schedule a TimeoutPrecommit.
75-
ScheduleTimeoutPrecommit(Round),
70+
/// Request to schedule a timeout for a specific step and round.
71+
ScheduleTimeout(Step, Round),
7672
/// Decision reached for the given proposal and round.
7773
DecisionReached(ProposalCommitment, Round),
7874
/// Request to re-propose (sent by the leader after advancing to a new round
@@ -511,7 +507,7 @@ impl StateMachine {
511507
}
512508
} else {
513509
info!("START_ROUND_VALIDATOR: Starting round {round} as Validator");
514-
VecDeque::from([SMRequest::ScheduleTimeoutPropose(self.round)])
510+
VecDeque::from([SMRequest::ScheduleTimeout(Step::Propose, self.round)])
515511
};
516512
output.append(&mut self.current_round_upons());
517513
output
@@ -625,7 +621,7 @@ impl StateMachine {
625621
if !self.mixed_prevote_quorum.insert(self.round) {
626622
return VecDeque::new();
627623
}
628-
VecDeque::from([SMRequest::ScheduleTimeoutPrevote(self.round)])
624+
VecDeque::from([SMRequest::ScheduleTimeout(Step::Prevote, self.round)])
629625
}
630626

631627
// LOC 36 in the paper.
@@ -684,7 +680,7 @@ impl StateMachine {
684680
if !self.mixed_precommit_quorum.insert(self.round) {
685681
return VecDeque::new();
686682
}
687-
VecDeque::from([SMRequest::ScheduleTimeoutPrecommit(self.round)])
683+
VecDeque::from([SMRequest::ScheduleTimeout(Step::Precommit, self.round)])
688684
}
689685

690686
// LOC 49 in the paper.

0 commit comments

Comments
 (0)