From d8d714f80414b4cf3f966f7e0b2d766efedbca9d Mon Sep 17 00:00:00 2001 From: Asmaa Magdoub Date: Sun, 23 Nov 2025 11:57:22 +0200 Subject: [PATCH] apollo_consensus: refactor shc::handle_event into smaller helpers --- .../src/single_height_consensus.rs | 168 +++++++++++------- 1 file changed, 100 insertions(+), 68 deletions(-) diff --git a/crates/apollo_consensus/src/single_height_consensus.rs b/crates/apollo_consensus/src/single_height_consensus.rs index 308d3422766..4d35e80c97f 100644 --- a/crates/apollo_consensus/src/single_height_consensus.rs +++ b/crates/apollo_consensus/src/single_height_consensus.rs @@ -148,82 +148,114 @@ impl SingleHeightConsensus { LeaderFn: Fn(Round) -> ValidatorId, { trace!("Received StateMachineEvent: {:?}", event); - let ret = match event { - StateMachineEvent::TimeoutPropose(_round) - | StateMachineEvent::TimeoutPrevote(_round) - | StateMachineEvent::TimeoutPrecommit(_round) => { - let sm_requests = self.state_machine.handle_event(event, leader_fn); - self.handle_state_machine_requests(sm_requests) - } - StateMachineEvent::VoteBroadcasted(vote) => { - let last_vote = match vote.vote_type { - VoteType::Prevote => { - self.state_machine.last_self_prevote().ok_or_else(|| { - ConsensusError::InternalInconsistency("No prevote to send".to_string()) - })? - } - VoteType::Precommit => { - self.state_machine.last_self_precommit().ok_or_else(|| { - ConsensusError::InternalInconsistency( - "No precommit to send".to_string(), - ) - })? - } - }; - if last_vote.round > vote.round { - // Only rebroadcast the newest vote. - return Ok(ShcReturn::Requests(VecDeque::new())); - } - assert_eq!(last_vote, vote); - trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); - Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)]))) - } + match event { + StateMachineEvent::TimeoutPropose(_) + | StateMachineEvent::TimeoutPrevote(_) + | StateMachineEvent::TimeoutPrecommit(_) => self.handle_timeout_event(leader_fn, event), + StateMachineEvent::VoteBroadcasted(vote) => self.handle_vote_broadcasted(vote), StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => { - debug!( - proposer = %leader_fn(round), - %round, - ?valid_round, - proposal_commitment = ?proposal_id, - node_round = self.state_machine.round(), - "Validated proposal.", - ); - if proposal_id.is_some() { - CONSENSUS_PROPOSALS_VALIDATED.increment(1); - } else { - CONSENSUS_PROPOSALS_INVALID.increment(1); - } - - // Cleanup: validation for round {round} finished, so remove it from the pending - // set. This doesn't affect logic. - self.pending_validation_rounds.remove(&round); - let requests = self.state_machine.handle_event(event, leader_fn); - self.handle_state_machine_requests(requests) + self.handle_finished_validation(leader_fn, proposal_id, round, valid_round) } StateMachineEvent::FinishedBuilding(proposal_id, round) => { - if proposal_id.is_none() { - CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1); - } - CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1); - // Ensure SM has no proposal recorded yet for this round when proposing. - assert!( - !self.state_machine.has_proposal_for_round(round), - "There should be no entry for round {round} when proposing" - ); - - assert_eq!( - round, - self.state_machine.round(), - "State machine should not progress while awaiting proposal" - ); - debug!(%round, proposal_commitment = ?proposal_id, "Built proposal."); - let requests = self.state_machine.handle_event(event, &leader_fn); - self.handle_state_machine_requests(requests) + self.handle_finished_building(leader_fn, proposal_id, round) } StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => { unreachable!("Peer votes must be handled via handle_vote") } + } + } + + fn handle_timeout_event( + &mut self, + leader_fn: &LeaderFn, + event: StateMachineEvent, + ) -> Result + where + LeaderFn: Fn(Round) -> ValidatorId, + { + let sm_requests = self.state_machine.handle_event(event, leader_fn); + self.handle_state_machine_requests(sm_requests) + } + + fn handle_vote_broadcasted(&mut self, vote: Vote) -> Result { + let last_vote = match vote.vote_type { + VoteType::Prevote => self.state_machine.last_self_prevote().ok_or_else(|| { + ConsensusError::InternalInconsistency("No prevote to send".to_string()) + })?, + VoteType::Precommit => self.state_machine.last_self_precommit().ok_or_else(|| { + ConsensusError::InternalInconsistency("No precommit to send".to_string()) + })?, }; - ret + if last_vote.round > vote.round { + // Only rebroadcast the newest vote. + return Ok(ShcReturn::Requests(VecDeque::new())); + } + assert_eq!(last_vote, vote); + trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); + Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)]))) + } + + fn handle_finished_validation( + &mut self, + leader_fn: &LeaderFn, + proposal_id: Option, + round: Round, + valid_round: Option, + ) -> Result + where + LeaderFn: Fn(Round) -> ValidatorId, + { + debug!( + proposer = %leader_fn(round), + %round, + ?valid_round, + proposal_commitment = ?proposal_id, + node_round = self.state_machine.round(), + "Validated proposal.", + ); + if proposal_id.is_some() { + CONSENSUS_PROPOSALS_VALIDATED.increment(1); + } else { + CONSENSUS_PROPOSALS_INVALID.increment(1); + } + // Cleanup: validation for round {round} finished, so remove it from the pending + // set. This doesn't affect logic. + self.pending_validation_rounds.remove(&round); + let requests = self.state_machine.handle_event( + StateMachineEvent::FinishedValidation(proposal_id, round, None), + leader_fn, + ); + self.handle_state_machine_requests(requests) + } + + fn handle_finished_building( + &mut self, + leader_fn: &LeaderFn, + proposal_id: Option, + round: Round, + ) -> Result + where + LeaderFn: Fn(Round) -> ValidatorId, + { + if proposal_id.is_none() { + CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1); + } + CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1); + // Ensure SM has no proposal recorded yet for this round when proposing. + assert!( + !self.state_machine.has_proposal_for_round(round), + "There should be no entry for round {round} when proposing" + ); + assert_eq!( + round, + self.state_machine.round(), + "State machine should not progress while awaiting proposal" + ); + debug!(%round, proposal_commitment = ?proposal_id, "Built proposal."); + let requests = self + .state_machine + .handle_event(StateMachineEvent::FinishedBuilding(proposal_id, round), leader_fn); + self.handle_state_machine_requests(requests) } /// Handle vote messages from peer nodes.