diff --git a/crates/apollo_consensus/src/manager.rs b/crates/apollo_consensus/src/manager.rs index 6ff313f3af4..79a6785e509 100644 --- a/crates/apollo_consensus/src/manager.rs +++ b/crates/apollo_consensus/src/manager.rs @@ -385,6 +385,39 @@ impl MultiHeightManager { CONSENSUS_BLOCK_NUMBER.set_lossy(height.0); self.cache.report_max_cached_block_number_metric(height); + if let Some(sync_result) = self.check_and_wait_for_sync(context, height).await? { + return Ok(sync_result); + } + + let (mut shc, mut shc_events) = + self.initialize_single_height_consensus(context, height).await; + + if let Some(decision) = self + .process_start_height(context, height, &mut shc, &mut shc_events, broadcast_channels) + .await? + { + error!("Decision reached before executing requests. {:?}", decision); + return Ok(RunHeightRes::Decision(decision)); + } + + self.process_consensus_events( + context, + height, + &mut shc, + &mut shc_events, + broadcast_channels, + proposals_receiver, + ) + .await + } + + /// Check if we need to sync and wait if necessary. + /// Returns Some(RunHeightRes::Sync) if sync is needed, None otherwise. + async fn check_and_wait_for_sync( + &mut self, + context: &mut ContextT, + height: BlockNumber, + ) -> Result, ConsensusError> { // If we already voted for this height, do not proceed until we sync to this height. // Otherwise, just check if we can sync to this height, immediately. If not, proceed with // consensus. @@ -400,11 +433,19 @@ impl MultiHeightManager { self.last_voted_height_at_initialization.unwrap().0 ); self.wait_until_sync_reaches_height(height, context).await; - return Ok(RunHeightRes::Sync); + return Ok(Some(RunHeightRes::Sync)); } else if context.try_sync(height).await { - return Ok(RunHeightRes::Sync); + return Ok(Some(RunHeightRes::Sync)); } + Ok(None) + } + /// Initialize consensus for a height: get validators, create SHC, and set up events. + async fn initialize_single_height_consensus( + &mut self, + context: &mut ContextT, + height: BlockNumber, + ) -> (SingleHeightConsensus, FuturesUnordered>) { let validators = context.validators(height).await; let is_observer = !validators.contains(&self.consensus_config.dynamic_config.validator_id); info!( @@ -412,7 +453,7 @@ impl MultiHeightManager { height, is_observer, validators, ); - let mut shc = SingleHeightConsensus::new( + let shc = SingleHeightConsensus::new( height, is_observer, self.consensus_config.dynamic_config.validator_id, @@ -420,27 +461,80 @@ impl MultiHeightManager { self.quorum_type, self.consensus_config.dynamic_config.timeouts.clone(), ); - let mut shc_events = FuturesUnordered::new(); + let shc_events = FuturesUnordered::new(); + + (shc, shc_events) + } + + /// Process the start of a height: call shc.start, process cached proposals/votes, and execute + /// initial requests. Returns Some(Decision) if a decision was reached (with error logging), + /// None otherwise. + async fn process_start_height( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + shc_events: &mut FuturesUnordered>, + broadcast_channels: &mut BroadcastVoteChannel, + ) -> Result, ConsensusError> { + self.cache.report_cached_votes_metric(height); + let mut pending_requests = { + let leader_fn = make_leader_fn(context, height); + match shc.start(&leader_fn)? { + ShcReturn::Decision(decision) => { + // Start should generate either StartValidateProposal (validator) or + // StartBuildProposal (proposer). We do not enforce this + // since the Manager is intentionally not meant to + // understand consensus in detail. + return Ok(Some(decision)); + } + ShcReturn::Requests(requests) => requests, + } + }; - match self.start_height(context, height, &mut shc).await? { - ShcReturn::Decision(decision) => { - return Ok(RunHeightRes::Decision(decision)); + let cached_proposals = self.cache.get_current_height_proposals(height); + trace!("Cached proposals for height {}: {:?}", height, cached_proposals); + for (init, content_receiver) in cached_proposals { + match self + .handle_proposal_known_init(context, height, shc, init, content_receiver) + .await? + { + ShcReturn::Decision(decision) => { + return Ok(Some(decision)); + } + ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests), } - ShcReturn::Requests(requests) => { - // Reflect initial height/round to context before executing requests. - context.set_height_and_round(height, shc.current_round()).await; - self.execute_requests( - context, - height, - requests, - &mut shc_events, - broadcast_channels, - ) - .await?; + } + + let cached_votes = self.cache.get_current_height_votes(height); + trace!("Cached votes for height {}: {:?}", height, cached_votes); + for msg in cached_votes { + let leader_fn = make_leader_fn(context, height); + match shc.handle_vote(&leader_fn, msg)? { + ShcReturn::Decision(decision) => { + return Ok(Some(decision)); + } + ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests), } } - // Loop over incoming proposals, messages, and self generated events. + // Reflect initial height/round to context before executing requests. + context.set_height_and_round(height, shc.current_round()).await; + self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels) + .await?; + Ok(None) + } + + /// Main consensus loop: handles incoming proposals, votes, events, and sync checks. + async fn process_consensus_events( + &mut self, + context: &mut ContextT, + height: BlockNumber, + shc: &mut SingleHeightConsensus, + shc_events: &mut FuturesUnordered>, + broadcast_channels: &mut BroadcastVoteChannel, + proposals_receiver: &mut mpsc::Receiver>, + ) -> Result { let clock = DefaultClock; let sync_retry_interval = self.consensus_config.dynamic_config.sync_retry_interval; let mut sync_poll_deadline = clock.now() + sync_retry_interval; @@ -448,13 +542,13 @@ impl MultiHeightManager { self.cache.report_max_cached_block_number_metric(height); let shc_return = tokio::select! { message = broadcast_channels.broadcasted_messages_receiver.next() => { - self.handle_vote(context, height, Some(&mut shc), message, broadcast_channels).await? + self.handle_vote(context, height, Some(shc), message, broadcast_channels).await? }, content_receiver = proposals_receiver.next() => { self.handle_proposal( context, height, - Some(&mut shc), + Some(shc), content_receiver ) .await? @@ -482,7 +576,7 @@ impl MultiHeightManager { context, height, requests, - &mut shc_events, + shc_events, broadcast_channels, ) .await?; @@ -491,53 +585,6 @@ impl MultiHeightManager { } } - async fn start_height( - &mut self, - context: &mut ContextT, - height: BlockNumber, - shc: &mut SingleHeightConsensus, - ) -> Result { - self.cache.report_cached_votes_metric(height); - let mut pending_requests = { - let leader_fn = make_leader_fn(context, height); - match shc.start(&leader_fn)? { - decision @ ShcReturn::Decision(_) => { - // Start should generate either StartValidateProposal (validator) or - // StartBuildProposal (proposer). We do not enforce this - // since the Manager is intentionally not meant to - // understand consensus in detail. - error!("Decision reached at start of height. {:?}", decision); - return Ok(decision); - } - ShcReturn::Requests(requests) => requests, - } - }; - - let cached_proposals = self.cache.get_current_height_proposals(height); - trace!("Cached proposals for height {}: {:?}", height, cached_proposals); - for (init, content_receiver) in cached_proposals { - match self - .handle_proposal_known_init(context, height, shc, init, content_receiver) - .await? - { - decision @ ShcReturn::Decision(_) => return Ok(decision), - ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests), - } - } - - let cached_votes = self.cache.get_current_height_votes(height); - trace!("Cached votes for height {}: {:?}", height, cached_votes); - for msg in cached_votes { - let leader_fn = make_leader_fn(context, height); - match shc.handle_vote(&leader_fn, msg)? { - decision @ ShcReturn::Decision(_) => return Ok(decision), - ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests), - } - } - - Ok(ShcReturn::Requests(pending_requests)) - } - // Handle a new proposal receiver from the network. // shc - None if the height was just completed and we should drop the message. async fn handle_proposal(