Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 88 additions & 69 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ use std::collections::{BTreeMap, VecDeque};
use std::sync::{Arc, Mutex};

use apollo_config_manager_types::communication::SharedConfigManagerClient;
use apollo_consensus_config::config::{ConsensusConfig, ConsensusDynamicConfig};
use apollo_consensus_config::config::{
ConsensusConfig,
ConsensusDynamicConfig,
FutureMsgLimitsConfig,
};
use apollo_infra_utils::debug_every_n_sec;
use apollo_network::network_manager::BroadcastTopicClientTrait;
use apollo_network_types::network_types::BroadcastedMessageMetadata;
Expand Down Expand Up @@ -180,11 +184,22 @@ struct ConsensusCache<ContextT: ConsensusContext> {
// Mapping: { Height : { Round : (Init, Receiver)}}
future_proposals_cache:
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
/// Configuration for determining which messages should be cached.
future_msg_limit: FutureMsgLimitsConfig,
}

impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
fn new() -> Self {
Self { future_votes: BTreeMap::new(), future_proposals_cache: BTreeMap::new() }
fn new(future_msg_limit: FutureMsgLimitsConfig) -> Self {
Self {
future_votes: BTreeMap::new(),
future_proposals_cache: BTreeMap::new(),
future_msg_limit,
}
}

/// Update the future message limits configuration.
fn set_future_msg_limit(&mut self, future_msg_limit: FutureMsgLimitsConfig) {
self.future_msg_limit = future_msg_limit;
}

/// Filters the cached messages:
Expand Down Expand Up @@ -265,6 +280,65 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
self.future_votes.get(&height).map(|votes| votes.len()).unwrap_or(0);
CONSENSUS_CACHED_VOTES.set_lossy(cached_votes_count);
}

fn should_cache_msg(
&self,
current_height: &BlockNumber,
current_round: Round,
msg_height: BlockNumber,
msg_round: Round,
msg_description: &str,
) -> bool {
let limits = &self.future_msg_limit;
let height_diff = msg_height.saturating_sub(current_height.0);

let should_cache = height_diff <= limits.future_height_limit.into()
// For current height, check against current round + future_round_limit
&& (height_diff == 0 && msg_round <= current_round + limits.future_round_limit
// For future heights, check absolute round limit
|| height_diff > 0 && msg_round <= limits.future_height_round_limit);

if !should_cache {
warn!(
"Dropping {} for height={} round={} when current_height={} current_round={} - \
limits: future_height={}, future_height_round={}, future_round={}",
msg_description,
msg_height,
msg_round,
current_height,
current_round,
limits.future_height_limit,
limits.future_height_round_limit,
limits.future_round_limit
);
}

should_cache
}

fn should_cache_proposal(
&self,
current_height: &BlockNumber,
current_round: Round,
proposal: &ProposalInit,
) -> bool {
self.should_cache_msg(
current_height,
current_round,
proposal.height,
proposal.round,
"proposal",
)
}

fn should_cache_vote(
&self,
current_height: &BlockNumber,
current_round: Round,
vote: &Vote,
) -> bool {
self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote")
}
}

/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
Expand All @@ -277,8 +351,6 @@ struct MultiHeightManager<ContextT: ConsensusContext> {
// The reason for this Arc<Mutex> we cannot share this instance mutably with
// SingleHeightConsensus despite them not ever using it at the same time in a simpler way, due
// rust limitations.
// TODO(guy.f): Remove in the following PR.
#[allow(dead_code)]
voted_height_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
// Proposal content streams keyed by (height, round)
current_height_proposals_streams:
Expand All @@ -298,18 +370,20 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
.expect("Lock should never be poisoned")
.get_prev_voted_height()
.expect("Failed to get previous voted height from storage");
let future_msg_limit = consensus_config.dynamic_config.future_msg_limit;
Self {
consensus_config,
quorum_type,
last_voted_height_at_initialization,
voted_height_storage,
current_height_proposals_streams: BTreeMap::new(),
cache: ConsensusCache::new(),
cache: ConsensusCache::new(future_msg_limit),
}
}

/// Apply the full dynamic consensus configuration. Call only between heights.
pub(crate) fn set_dynamic_config(&mut self, cfg: ConsensusDynamicConfig) {
self.cache.set_future_msg_limit(cfg.future_msg_limit);
self.consensus_config.dynamic_config = cfg;
}

Expand Down Expand Up @@ -616,7 +690,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {

match proposal_init.height.cmp(&height) {
std::cmp::Ordering::Greater => {
if self.should_cache_proposal(&height, 0, &proposal_init) {
if self.cache.should_cache_proposal(&height, 0, &proposal_init) {
debug!("Received a proposal for a future height. {:?}", proposal_init);
// Note: new proposals with the same height/round will be ignored.
//
Expand All @@ -636,7 +710,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
std::cmp::Ordering::Equal => match shc {
Some(shc) => {
if self.should_cache_proposal(&height, shc.current_round(), &proposal_init) {
if self.cache.should_cache_proposal(
&height,
shc.current_round(),
&proposal_init,
) {
self.handle_proposal_known_init(
context,
height,
Expand Down Expand Up @@ -717,7 +795,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
match message.height.cmp(&height) {
std::cmp::Ordering::Greater => {
if self.should_cache_vote(&height, 0, &message) {
if self.cache.should_cache_vote(&height, 0, &message) {
trace!("Cache message for a future height. {:?}", message);
self.cache.cache_future_vote(message);
}
Expand All @@ -729,7 +807,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
std::cmp::Ordering::Equal => match shc {
Some(shc) => {
if self.should_cache_vote(&height, shc.current_round(), &message) {
if self.cache.should_cache_vote(&height, shc.current_round(), &message) {
let leader_fn = make_leader_fn(context, height);
shc.handle_vote(&leader_fn, message)
} else {
Expand Down Expand Up @@ -869,65 +947,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}
}

fn should_cache_msg(
&self,
current_height: &BlockNumber,
current_round: Round,
msg_height: BlockNumber,
msg_round: Round,
msg_description: &str,
) -> bool {
let limits = &self.consensus_config.dynamic_config.future_msg_limit;
let height_diff = msg_height.saturating_sub(current_height.0);

let should_cache = height_diff <= limits.future_height_limit.into()
// For current height, check against current round + future_round_limit
&& (height_diff == 0 && msg_round <= current_round + limits.future_round_limit
// For future heights, check absolute round limit
|| height_diff > 0 && msg_round <= limits.future_height_round_limit);

if !should_cache {
warn!(
"Dropping {} for height={} round={} when current_height={} current_round={} - \
limits: future_height={}, future_height_round={}, future_round={}",
msg_description,
msg_height,
msg_round,
current_height,
current_round,
limits.future_height_limit,
limits.future_height_round_limit,
limits.future_round_limit
);
}

should_cache
}

fn should_cache_proposal(
&self,
current_height: &BlockNumber,
current_round: Round,
proposal: &ProposalInit,
) -> bool {
self.should_cache_msg(
current_height,
current_round,
proposal.height,
proposal.round,
"proposal",
)
}

fn should_cache_vote(
&self,
current_height: &BlockNumber,
current_round: Round,
vote: &Vote,
) -> bool {
self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote")
}
}

/// Creates a closure that returns the proposer for a given round at the specified height.
Expand Down