Skip to content

Commit b9db4a7

Browse files
apollo_consensus: refactor shc::handle_event into smaller helpers
1 parent a9ec182 commit b9db4a7

File tree

1 file changed

+100
-68
lines changed

1 file changed

+100
-68
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 100 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -149,82 +149,114 @@ impl SingleHeightConsensus {
149149
LeaderFn: Fn(Round) -> ValidatorId,
150150
{
151151
trace!("Received StateMachineEvent: {:?}", event);
152-
let ret = match event {
153-
StateMachineEvent::TimeoutPropose(_round)
154-
| StateMachineEvent::TimeoutPrevote(_round)
155-
| StateMachineEvent::TimeoutPrecommit(_round) => {
156-
let sm_requests = self.state_machine.handle_event(event, leader_fn);
157-
self.handle_state_machine_requests(sm_requests)
158-
}
159-
StateMachineEvent::VoteBroadcasted(vote) => {
160-
let last_vote = match vote.vote_type {
161-
VoteType::Prevote => {
162-
self.state_machine.last_self_prevote().ok_or_else(|| {
163-
ConsensusError::InternalInconsistency("No prevote to send".to_string())
164-
})?
165-
}
166-
VoteType::Precommit => {
167-
self.state_machine.last_self_precommit().ok_or_else(|| {
168-
ConsensusError::InternalInconsistency(
169-
"No precommit to send".to_string(),
170-
)
171-
})?
172-
}
173-
};
174-
if last_vote.round > vote.round {
175-
// Only rebroadcast the newest vote.
176-
return Ok(ShcReturn::Requests(VecDeque::new()));
177-
}
178-
assert_eq!(last_vote, vote);
179-
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
180-
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
181-
}
152+
match event {
153+
StateMachineEvent::TimeoutPropose(_)
154+
| StateMachineEvent::TimeoutPrevote(_)
155+
| StateMachineEvent::TimeoutPrecommit(_) => self.handle_timeout_event(leader_fn, event),
156+
StateMachineEvent::VoteBroadcasted(vote) => self.handle_vote_broadcasted(vote),
182157
StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => {
183-
debug!(
184-
proposer = %leader_fn(round),
185-
%round,
186-
?valid_round,
187-
proposal_commitment = ?proposal_id,
188-
node_round = self.state_machine.round(),
189-
"Validated proposal.",
190-
);
191-
if proposal_id.is_some() {
192-
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
193-
} else {
194-
CONSENSUS_PROPOSALS_INVALID.increment(1);
195-
}
196-
197-
// Cleanup: validation for round {round} finished, so remove it from the pending
198-
// set. This doesn't affect logic.
199-
self.pending_validation_rounds.remove(&round);
200-
let requests = self.state_machine.handle_event(event, leader_fn);
201-
self.handle_state_machine_requests(requests)
158+
self.handle_finished_validation(leader_fn, proposal_id, round, valid_round)
202159
}
203160
StateMachineEvent::FinishedBuilding(proposal_id, round) => {
204-
if proposal_id.is_none() {
205-
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
206-
}
207-
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
208-
// Ensure SM has no proposal recorded yet for this round when proposing.
209-
assert!(
210-
!self.state_machine.has_proposal_for_round(round),
211-
"There should be no entry for round {round} when proposing"
212-
);
213-
214-
assert_eq!(
215-
round,
216-
self.state_machine.round(),
217-
"State machine should not progress while awaiting proposal"
218-
);
219-
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
220-
let requests = self.state_machine.handle_event(event, &leader_fn);
221-
self.handle_state_machine_requests(requests)
161+
self.handle_finished_building(leader_fn, proposal_id, round)
222162
}
223163
StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => {
224164
unreachable!("Peer votes must be handled via handle_vote")
225165
}
166+
}
167+
}
168+
169+
fn handle_timeout_event<LeaderFn>(
170+
&mut self,
171+
leader_fn: &LeaderFn,
172+
event: StateMachineEvent,
173+
) -> Result<ShcReturn, ConsensusError>
174+
where
175+
LeaderFn: Fn(Round) -> ValidatorId,
176+
{
177+
let sm_requests = self.state_machine.handle_event(event, leader_fn);
178+
self.handle_state_machine_requests(sm_requests)
179+
}
180+
181+
fn handle_vote_broadcasted(&mut self, vote: Vote) -> Result<ShcReturn, ConsensusError> {
182+
let last_vote = match vote.vote_type {
183+
VoteType::Prevote => self.state_machine.last_self_prevote().ok_or_else(|| {
184+
ConsensusError::InternalInconsistency("No prevote to send".to_string())
185+
})?,
186+
VoteType::Precommit => self.state_machine.last_self_precommit().ok_or_else(|| {
187+
ConsensusError::InternalInconsistency("No precommit to send".to_string())
188+
})?,
226189
};
227-
ret
190+
if last_vote.round > vote.round {
191+
// Only rebroadcast the newest vote.
192+
return Ok(ShcReturn::Requests(VecDeque::new()));
193+
}
194+
assert_eq!(last_vote, vote);
195+
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
196+
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
197+
}
198+
199+
fn handle_finished_validation<LeaderFn>(
200+
&mut self,
201+
leader_fn: &LeaderFn,
202+
proposal_id: Option<ProposalCommitment>,
203+
round: Round,
204+
valid_round: Option<Round>,
205+
) -> Result<ShcReturn, ConsensusError>
206+
where
207+
LeaderFn: Fn(Round) -> ValidatorId,
208+
{
209+
debug!(
210+
proposer = %leader_fn(round),
211+
%round,
212+
?valid_round,
213+
proposal_commitment = ?proposal_id,
214+
node_round = self.state_machine.round(),
215+
"Validated proposal.",
216+
);
217+
if proposal_id.is_some() {
218+
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
219+
} else {
220+
CONSENSUS_PROPOSALS_INVALID.increment(1);
221+
}
222+
// Cleanup: validation for round {round} finished, so remove it from the pending
223+
// set. This doesn't affect logic.
224+
self.pending_validation_rounds.remove(&round);
225+
let requests = self.state_machine.handle_event(
226+
StateMachineEvent::FinishedValidation(proposal_id, round, None),
227+
leader_fn,
228+
);
229+
self.handle_state_machine_requests(requests)
230+
}
231+
232+
fn handle_finished_building<LeaderFn>(
233+
&mut self,
234+
leader_fn: &LeaderFn,
235+
proposal_id: Option<ProposalCommitment>,
236+
round: Round,
237+
) -> Result<ShcReturn, ConsensusError>
238+
where
239+
LeaderFn: Fn(Round) -> ValidatorId,
240+
{
241+
if proposal_id.is_none() {
242+
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
243+
}
244+
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
245+
// Ensure SM has no proposal recorded yet for this round when proposing.
246+
assert!(
247+
!self.state_machine.has_proposal_for_round(round),
248+
"There should be no entry for round {round} when proposing"
249+
);
250+
assert_eq!(
251+
round,
252+
self.state_machine.round(),
253+
"State machine should not progress while awaiting proposal"
254+
);
255+
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
256+
let requests = self
257+
.state_machine
258+
.handle_event(StateMachineEvent::FinishedBuilding(proposal_id, round), leader_fn);
259+
self.handle_state_machine_requests(requests)
228260
}
229261

230262
/// Handle vote messages from peer nodes.

0 commit comments

Comments
 (0)