Skip to content

Commit 9066d2f

Browse files
apollo_consensus: refactor shc::handle_event into smaller helpers
1 parent 713c560 commit 9066d2f

File tree

1 file changed

+110
-72
lines changed

1 file changed

+110
-72
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 110 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -149,85 +149,123 @@ impl SingleHeightConsensus {
149149
LeaderFn: Fn(Round) -> ValidatorId,
150150
{
151151
trace!("Received StateMachineEvent: {:?}", event);
152-
let ret = match event {
153-
StateMachineEvent::TimeoutPropose(_round)
154-
| StateMachineEvent::TimeoutPrevote(_round)
155-
| StateMachineEvent::TimeoutPrecommit(_round) => {
156-
let sm_requests = self.state_machine.handle_event(event, leader_fn);
157-
self.handle_state_machine_requests(sm_requests)
158-
}
159-
StateMachineEvent::RebroadcastVote(vote) => match vote.vote_type {
160-
VoteType::Prevote => {
161-
let Some(last_vote) = self.state_machine.last_self_prevote() else {
162-
return Err(ConsensusError::InternalInconsistency(
163-
"No prevote to send".to_string(),
164-
));
165-
};
166-
if last_vote.round > vote.round {
167-
// Only replay the newest prevote.
168-
return Ok(ShcReturn::Requests(VecDeque::new()));
169-
}
170-
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
171-
Ok(ShcReturn::Requests(VecDeque::from([
172-
SMRequest::BroadcastVote(last_vote.clone()),
173-
SMRequest::ScheduleTimeoutRebroadcast(last_vote),
174-
])))
175-
}
176-
VoteType::Precommit => {
177-
let Some(last_vote) = self.state_machine.last_self_precommit() else {
178-
return Err(ConsensusError::InternalInconsistency(
179-
"No precommit to send".to_string(),
180-
));
181-
};
182-
if last_vote.round > vote.round {
183-
// Only replay the newest precommit.
184-
return Ok(ShcReturn::Requests(VecDeque::new()));
185-
}
186-
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
187-
Ok(ShcReturn::Requests(VecDeque::from([
188-
SMRequest::BroadcastVote(last_vote.clone()),
189-
SMRequest::ScheduleTimeoutRebroadcast(last_vote),
190-
])))
191-
}
192-
},
152+
match event {
153+
StateMachineEvent::TimeoutPropose(_)
154+
| StateMachineEvent::TimeoutPrevote(_)
155+
| StateMachineEvent::TimeoutPrecommit(_) => self.handle_timeout_event(leader_fn, event),
156+
StateMachineEvent::RebroadcastVote(vote) => self.handle_rebroadcast_vote(vote),
193157
StateMachineEvent::FinishedValidation(proposal_id, round, _valid_round) => {
194-
if proposal_id.is_some() {
195-
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
196-
} else {
197-
CONSENSUS_PROPOSALS_INVALID.increment(1);
198-
}
199-
200-
// Validation for this round finished; clear the pending marker.
201-
self.pending_validation_rounds.remove(&round);
202-
let requests = self.state_machine.handle_event(event, leader_fn);
203-
self.handle_state_machine_requests(requests)
158+
self.handle_finished_validation(leader_fn, proposal_id, round)
204159
}
205160
StateMachineEvent::FinishedBuilding(proposal_id, round) => {
206-
if proposal_id.is_none() {
207-
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
208-
} else {
209-
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
210-
}
211-
// Ensure SM has no proposal recorded yet for this round when proposing.
212-
assert!(
213-
self.state_machine.proposals_ref().get(&round).is_none(),
214-
"There should be no entry for round {round} when proposing"
215-
);
216-
217-
assert_eq!(
218-
round,
219-
self.state_machine.round(),
220-
"State machine should not progress while awaiting proposal"
221-
);
222-
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
223-
let requests = self.state_machine.handle_event(event, &leader_fn);
224-
self.handle_state_machine_requests(requests)
161+
self.handle_finished_building(leader_fn, proposal_id, round)
225162
}
226163
StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => {
227164
unreachable!("Peer votes must be handled via handle_vote")
228165
}
229-
};
230-
ret
166+
}
167+
}
168+
169+
fn handle_timeout_event<LeaderFn>(
170+
&mut self,
171+
leader_fn: &LeaderFn,
172+
event: StateMachineEvent,
173+
) -> Result<ShcReturn, ConsensusError>
174+
where
175+
LeaderFn: Fn(Round) -> ValidatorId,
176+
{
177+
let sm_requests = self.state_machine.handle_event(event, leader_fn);
178+
self.handle_state_machine_requests(sm_requests)
179+
}
180+
181+
fn handle_rebroadcast_vote(&mut self, vote: Vote) -> Result<ShcReturn, ConsensusError> {
182+
match vote.vote_type {
183+
VoteType::Prevote => {
184+
let Some(last_vote) = self.state_machine.last_self_prevote() else {
185+
return Err(ConsensusError::InternalInconsistency(
186+
"No prevote to send".to_string(),
187+
));
188+
};
189+
if last_vote.round > vote.round {
190+
// Only replay the newest prevote.
191+
return Ok(ShcReturn::Requests(VecDeque::new()));
192+
}
193+
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
194+
Ok(ShcReturn::Requests(VecDeque::from([
195+
SMRequest::BroadcastVote(last_vote.clone()),
196+
SMRequest::ScheduleTimeoutRebroadcast(last_vote),
197+
])))
198+
}
199+
VoteType::Precommit => {
200+
let Some(last_vote) = self.state_machine.last_self_precommit() else {
201+
return Err(ConsensusError::InternalInconsistency(
202+
"No precommit to send".to_string(),
203+
));
204+
};
205+
if last_vote.round > vote.round {
206+
// Only replay the newest precommit.
207+
return Ok(ShcReturn::Requests(VecDeque::new()));
208+
}
209+
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
210+
Ok(ShcReturn::Requests(VecDeque::from([
211+
SMRequest::BroadcastVote(last_vote.clone()),
212+
SMRequest::ScheduleTimeoutRebroadcast(last_vote),
213+
])))
214+
}
215+
}
216+
}
217+
218+
fn handle_finished_validation<LeaderFn>(
219+
&mut self,
220+
leader_fn: &LeaderFn,
221+
proposal_id: Option<ProposalCommitment>,
222+
round: Round,
223+
) -> Result<ShcReturn, ConsensusError>
224+
where
225+
LeaderFn: Fn(Round) -> ValidatorId,
226+
{
227+
if proposal_id.is_some() {
228+
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
229+
} else {
230+
CONSENSUS_PROPOSALS_INVALID.increment(1);
231+
}
232+
// Validation finished; clear the pending marker for this round.
233+
self.pending_validation_rounds.remove(&round);
234+
let requests = self.state_machine.handle_event(
235+
StateMachineEvent::FinishedValidation(proposal_id, round, None),
236+
leader_fn,
237+
);
238+
self.handle_state_machine_requests(requests)
239+
}
240+
241+
fn handle_finished_building<LeaderFn>(
242+
&mut self,
243+
leader_fn: &LeaderFn,
244+
proposal_id: Option<ProposalCommitment>,
245+
round: Round,
246+
) -> Result<ShcReturn, ConsensusError>
247+
where
248+
LeaderFn: Fn(Round) -> ValidatorId,
249+
{
250+
if proposal_id.is_none() {
251+
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
252+
} else {
253+
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
254+
}
255+
// Ensure SM has no proposal recorded yet for this round when proposing.
256+
assert!(
257+
self.state_machine.proposals_ref().get(&round).is_none(),
258+
"There should be no entry for round {round} when proposing"
259+
);
260+
assert_eq!(
261+
round,
262+
self.state_machine.round(),
263+
"State machine should not progress while awaiting proposal"
264+
);
265+
let requests = self
266+
.state_machine
267+
.handle_event(StateMachineEvent::FinishedBuilding(proposal_id, round), leader_fn);
268+
self.handle_state_machine_requests(requests)
231269
}
232270

233271
/// Handle vote messages from peer nodes.

0 commit comments

Comments
 (0)