diff --git a/crates/apollo_consensus/src/manager.rs b/crates/apollo_consensus/src/manager.rs index e301b0f12af..3310453a769 100644 --- a/crates/apollo_consensus/src/manager.rs +++ b/crates/apollo_consensus/src/manager.rs @@ -13,7 +13,11 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::{Arc, Mutex}; use apollo_config_manager_types::communication::SharedConfigManagerClient; -use apollo_consensus_config::config::{ConsensusConfig, ConsensusDynamicConfig}; +use apollo_consensus_config::config::{ + ConsensusConfig, + ConsensusDynamicConfig, + FutureMsgLimitsConfig, +}; use apollo_infra_utils::debug_every_n_sec; use apollo_network::network_manager::BroadcastTopicClientTrait; use apollo_network_types::network_types::BroadcastedMessageMetadata; @@ -180,11 +184,22 @@ struct ConsensusCache { // Mapping: { Height : { Round : (Init, Receiver)}} future_proposals_cache: BTreeMap>>, + /// Configuration for determining which messages should be cached. + future_msg_limit: FutureMsgLimitsConfig, } impl ConsensusCache { - fn new() -> Self { - Self { future_votes: BTreeMap::new(), future_proposals_cache: BTreeMap::new() } + fn new(future_msg_limit: FutureMsgLimitsConfig) -> Self { + Self { + future_votes: BTreeMap::new(), + future_proposals_cache: BTreeMap::new(), + future_msg_limit, + } + } + + /// Update the future message limits configuration. + fn set_future_msg_limit(&mut self, future_msg_limit: FutureMsgLimitsConfig) { + self.future_msg_limit = future_msg_limit; } /// Filters the cached messages: @@ -265,6 +280,65 @@ impl ConsensusCache { self.future_votes.get(&height).map(|votes| votes.len()).unwrap_or(0); CONSENSUS_CACHED_VOTES.set_lossy(cached_votes_count); } + + fn should_cache_msg( + &self, + current_height: &BlockNumber, + current_round: Round, + msg_height: BlockNumber, + msg_round: Round, + msg_description: &str, + ) -> bool { + let limits = &self.future_msg_limit; + let height_diff = msg_height.saturating_sub(current_height.0); + + let should_cache = height_diff <= limits.future_height_limit.into() + // For current height, check against current round + future_round_limit + && (height_diff == 0 && msg_round <= current_round + limits.future_round_limit + // For future heights, check absolute round limit + || height_diff > 0 && msg_round <= limits.future_height_round_limit); + + if !should_cache { + warn!( + "Dropping {} for height={} round={} when current_height={} current_round={} - \ + limits: future_height={}, future_height_round={}, future_round={}", + msg_description, + msg_height, + msg_round, + current_height, + current_round, + limits.future_height_limit, + limits.future_height_round_limit, + limits.future_round_limit + ); + } + + should_cache + } + + fn should_cache_proposal( + &self, + current_height: &BlockNumber, + current_round: Round, + proposal: &ProposalInit, + ) -> bool { + self.should_cache_msg( + current_height, + current_round, + proposal.height, + proposal.round, + "proposal", + ) + } + + fn should_cache_vote( + &self, + current_height: &BlockNumber, + current_round: Round, + vote: &Vote, + ) -> bool { + self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote") + } } /// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly @@ -277,8 +351,6 @@ struct MultiHeightManager { // The reason for this Arc we cannot share this instance mutably with // SingleHeightConsensus despite them not ever using it at the same time in a simpler way, due // rust limitations. - // TODO(guy.f): Remove in the following PR. - #[allow(dead_code)] voted_height_storage: Arc>, // Proposal content streams keyed by (height, round) current_height_proposals_streams: @@ -298,18 +370,20 @@ impl MultiHeightManager { .expect("Lock should never be poisoned") .get_prev_voted_height() .expect("Failed to get previous voted height from storage"); + let future_msg_limit = consensus_config.dynamic_config.future_msg_limit; Self { consensus_config, quorum_type, last_voted_height_at_initialization, voted_height_storage, current_height_proposals_streams: BTreeMap::new(), - cache: ConsensusCache::new(), + cache: ConsensusCache::new(future_msg_limit), } } /// Apply the full dynamic consensus configuration. Call only between heights. pub(crate) fn set_dynamic_config(&mut self, cfg: ConsensusDynamicConfig) { + self.cache.set_future_msg_limit(cfg.future_msg_limit); self.consensus_config.dynamic_config = cfg; } @@ -616,7 +690,7 @@ impl MultiHeightManager { match proposal_init.height.cmp(&height) { std::cmp::Ordering::Greater => { - if self.should_cache_proposal(&height, 0, &proposal_init) { + if self.cache.should_cache_proposal(&height, 0, &proposal_init) { debug!("Received a proposal for a future height. {:?}", proposal_init); // Note: new proposals with the same height/round will be ignored. // @@ -636,7 +710,11 @@ impl MultiHeightManager { } std::cmp::Ordering::Equal => match shc { Some(shc) => { - if self.should_cache_proposal(&height, shc.current_round(), &proposal_init) { + if self.cache.should_cache_proposal( + &height, + shc.current_round(), + &proposal_init, + ) { self.handle_proposal_known_init( context, height, @@ -717,7 +795,7 @@ impl MultiHeightManager { // 2. Parallel proposals - we may send/receive a proposal for (H+1, 0). match message.height.cmp(&height) { std::cmp::Ordering::Greater => { - if self.should_cache_vote(&height, 0, &message) { + if self.cache.should_cache_vote(&height, 0, &message) { trace!("Cache message for a future height. {:?}", message); self.cache.cache_future_vote(message); } @@ -729,7 +807,7 @@ impl MultiHeightManager { } std::cmp::Ordering::Equal => match shc { Some(shc) => { - if self.should_cache_vote(&height, shc.current_round(), &message) { + if self.cache.should_cache_vote(&height, shc.current_round(), &message) { let leader_fn = make_leader_fn(context, height); shc.handle_vote(&leader_fn, message) } else { @@ -869,65 +947,6 @@ impl MultiHeightManager { } } } - - fn should_cache_msg( - &self, - current_height: &BlockNumber, - current_round: Round, - msg_height: BlockNumber, - msg_round: Round, - msg_description: &str, - ) -> bool { - let limits = &self.consensus_config.dynamic_config.future_msg_limit; - let height_diff = msg_height.saturating_sub(current_height.0); - - let should_cache = height_diff <= limits.future_height_limit.into() - // For current height, check against current round + future_round_limit - && (height_diff == 0 && msg_round <= current_round + limits.future_round_limit - // For future heights, check absolute round limit - || height_diff > 0 && msg_round <= limits.future_height_round_limit); - - if !should_cache { - warn!( - "Dropping {} for height={} round={} when current_height={} current_round={} - \ - limits: future_height={}, future_height_round={}, future_round={}", - msg_description, - msg_height, - msg_round, - current_height, - current_round, - limits.future_height_limit, - limits.future_height_round_limit, - limits.future_round_limit - ); - } - - should_cache - } - - fn should_cache_proposal( - &self, - current_height: &BlockNumber, - current_round: Round, - proposal: &ProposalInit, - ) -> bool { - self.should_cache_msg( - current_height, - current_round, - proposal.height, - proposal.round, - "proposal", - ) - } - - fn should_cache_vote( - &self, - current_height: &BlockNumber, - current_round: Round, - vote: &Vote, - ) -> bool { - self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote") - } } /// Creates a closure that returns the proposer for a given round at the specified height.