Skip to content

Commit 7aa75be

Browse files
apollo_consensus: refactor shc::handle_event into smaller helpers
1 parent 953660e commit 7aa75be

File tree

1 file changed

+100
-68
lines changed

1 file changed

+100
-68
lines changed

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 100 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -148,82 +148,114 @@ impl SingleHeightConsensus {
148148
LeaderFn: Fn(Round) -> ValidatorId,
149149
{
150150
trace!("Received StateMachineEvent: {:?}", event);
151-
let ret = match event {
152-
StateMachineEvent::TimeoutPropose(_round)
153-
| StateMachineEvent::TimeoutPrevote(_round)
154-
| StateMachineEvent::TimeoutPrecommit(_round) => {
155-
let sm_requests = self.state_machine.handle_event(event, leader_fn);
156-
self.handle_state_machine_requests(sm_requests)
157-
}
158-
StateMachineEvent::VoteBroadcasted(vote) => {
159-
let last_vote = match vote.vote_type {
160-
VoteType::Prevote => {
161-
self.state_machine.last_self_prevote().ok_or_else(|| {
162-
ConsensusError::InternalInconsistency("No prevote to send".to_string())
163-
})?
164-
}
165-
VoteType::Precommit => {
166-
self.state_machine.last_self_precommit().ok_or_else(|| {
167-
ConsensusError::InternalInconsistency(
168-
"No precommit to send".to_string(),
169-
)
170-
})?
171-
}
172-
};
173-
if last_vote.round > vote.round {
174-
// Only rebroadcast the newest vote.
175-
return Ok(ShcReturn::Requests(VecDeque::new()));
176-
}
177-
assert_eq!(last_vote, vote);
178-
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
179-
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
180-
}
151+
match event {
152+
StateMachineEvent::TimeoutPropose(_)
153+
| StateMachineEvent::TimeoutPrevote(_)
154+
| StateMachineEvent::TimeoutPrecommit(_) => self.handle_timeout_event(leader_fn, event),
155+
StateMachineEvent::VoteBroadcasted(vote) => self.handle_vote_broadcasted(vote),
181156
StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => {
182-
debug!(
183-
proposer = %leader_fn(round),
184-
%round,
185-
?valid_round,
186-
proposal_commitment = ?proposal_id,
187-
node_round = self.state_machine.round(),
188-
"Validated proposal.",
189-
);
190-
if proposal_id.is_some() {
191-
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
192-
} else {
193-
CONSENSUS_PROPOSALS_INVALID.increment(1);
194-
}
195-
196-
// Cleanup: validation for round {round} finished, so remove it from the pending
197-
// set. This doesn't affect logic.
198-
self.pending_validation_rounds.remove(&round);
199-
let requests = self.state_machine.handle_event(event, leader_fn);
200-
self.handle_state_machine_requests(requests)
157+
self.handle_finished_validation(leader_fn, proposal_id, round, valid_round)
201158
}
202159
StateMachineEvent::FinishedBuilding(proposal_id, round) => {
203-
if proposal_id.is_none() {
204-
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
205-
}
206-
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
207-
// Ensure SM has no proposal recorded yet for this round when proposing.
208-
assert!(
209-
!self.state_machine.has_proposal_for_round(round),
210-
"There should be no entry for round {round} when proposing"
211-
);
212-
213-
assert_eq!(
214-
round,
215-
self.state_machine.round(),
216-
"State machine should not progress while awaiting proposal"
217-
);
218-
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
219-
let requests = self.state_machine.handle_event(event, &leader_fn);
220-
self.handle_state_machine_requests(requests)
160+
self.handle_finished_building(leader_fn, proposal_id, round)
221161
}
222162
StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => {
223163
unreachable!("Peer votes must be handled via handle_vote")
224164
}
165+
}
166+
}
167+
168+
fn handle_timeout_event<LeaderFn>(
169+
&mut self,
170+
leader_fn: &LeaderFn,
171+
event: StateMachineEvent,
172+
) -> Result<ShcReturn, ConsensusError>
173+
where
174+
LeaderFn: Fn(Round) -> ValidatorId,
175+
{
176+
let sm_requests = self.state_machine.handle_event(event, leader_fn);
177+
self.handle_state_machine_requests(sm_requests)
178+
}
179+
180+
fn handle_vote_broadcasted(&mut self, vote: Vote) -> Result<ShcReturn, ConsensusError> {
181+
let last_vote = match vote.vote_type {
182+
VoteType::Prevote => self.state_machine.last_self_prevote().ok_or_else(|| {
183+
ConsensusError::InternalInconsistency("No prevote to send".to_string())
184+
})?,
185+
VoteType::Precommit => self.state_machine.last_self_precommit().ok_or_else(|| {
186+
ConsensusError::InternalInconsistency("No precommit to send".to_string())
187+
})?,
225188
};
226-
ret
189+
if last_vote.round > vote.round {
190+
// Only rebroadcast the newest vote.
191+
return Ok(ShcReturn::Requests(VecDeque::new()));
192+
}
193+
assert_eq!(last_vote, vote);
194+
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
195+
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
196+
}
197+
198+
fn handle_finished_validation<LeaderFn>(
199+
&mut self,
200+
leader_fn: &LeaderFn,
201+
proposal_id: Option<ProposalCommitment>,
202+
round: Round,
203+
valid_round: Option<Round>,
204+
) -> Result<ShcReturn, ConsensusError>
205+
where
206+
LeaderFn: Fn(Round) -> ValidatorId,
207+
{
208+
debug!(
209+
proposer = %leader_fn(round),
210+
%round,
211+
?valid_round,
212+
proposal_commitment = ?proposal_id,
213+
node_round = self.state_machine.round(),
214+
"Validated proposal.",
215+
);
216+
if proposal_id.is_some() {
217+
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
218+
} else {
219+
CONSENSUS_PROPOSALS_INVALID.increment(1);
220+
}
221+
// Cleanup: validation for round {round} finished, so remove it from the pending
222+
// set. This doesn't affect logic.
223+
self.pending_validation_rounds.remove(&round);
224+
let requests = self.state_machine.handle_event(
225+
StateMachineEvent::FinishedValidation(proposal_id, round, None),
226+
leader_fn,
227+
);
228+
self.handle_state_machine_requests(requests)
229+
}
230+
231+
fn handle_finished_building<LeaderFn>(
232+
&mut self,
233+
leader_fn: &LeaderFn,
234+
proposal_id: Option<ProposalCommitment>,
235+
round: Round,
236+
) -> Result<ShcReturn, ConsensusError>
237+
where
238+
LeaderFn: Fn(Round) -> ValidatorId,
239+
{
240+
if proposal_id.is_none() {
241+
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
242+
}
243+
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
244+
// Ensure SM has no proposal recorded yet for this round when proposing.
245+
assert!(
246+
!self.state_machine.has_proposal_for_round(round),
247+
"There should be no entry for round {round} when proposing"
248+
);
249+
assert_eq!(
250+
round,
251+
self.state_machine.round(),
252+
"State machine should not progress while awaiting proposal"
253+
);
254+
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
255+
let requests = self
256+
.state_machine
257+
.handle_event(StateMachineEvent::FinishedBuilding(proposal_id, round), leader_fn);
258+
self.handle_state_machine_requests(requests)
227259
}
228260

229261
/// Handle vote messages from peer nodes.

0 commit comments

Comments
 (0)