Skip to content

Commit 9607992

Browse files
apollo_consensus: move caching decision logic to ConsensusCache
1 parent 30bc6f9 commit 9607992

File tree

1 file changed

+88
-69
lines changed

1 file changed

+88
-69
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 88 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ use std::collections::{BTreeMap, VecDeque};
1313
use std::sync::{Arc, Mutex};
1414

1515
use apollo_config_manager_types::communication::SharedConfigManagerClient;
16-
use apollo_consensus_config::config::{ConsensusConfig, ConsensusDynamicConfig};
16+
use apollo_consensus_config::config::{
17+
ConsensusConfig,
18+
ConsensusDynamicConfig,
19+
FutureMsgLimitsConfig,
20+
};
1721
use apollo_infra_utils::debug_every_n_sec;
1822
use apollo_network::network_manager::BroadcastTopicClientTrait;
1923
use apollo_network_types::network_types::BroadcastedMessageMetadata;
@@ -180,11 +184,22 @@ struct ConsensusCache<ContextT: ConsensusContext> {
180184
// Mapping: { Height : { Round : (Init, Receiver)}}
181185
future_proposals_cache:
182186
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
187+
/// Configuration for determining which messages should be cached.
188+
future_msg_limit: FutureMsgLimitsConfig,
183189
}
184190

185191
impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
186-
fn new() -> Self {
187-
Self { future_votes: BTreeMap::new(), future_proposals_cache: BTreeMap::new() }
192+
fn new(future_msg_limit: FutureMsgLimitsConfig) -> Self {
193+
Self {
194+
future_votes: BTreeMap::new(),
195+
future_proposals_cache: BTreeMap::new(),
196+
future_msg_limit,
197+
}
198+
}
199+
200+
/// Update the future message limits configuration.
201+
fn set_future_msg_limit(&mut self, future_msg_limit: FutureMsgLimitsConfig) {
202+
self.future_msg_limit = future_msg_limit;
188203
}
189204

190205
/// Filters the cached messages:
@@ -265,6 +280,65 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
265280
self.future_votes.get(&height).map(|votes| votes.len()).unwrap_or(0);
266281
CONSENSUS_CACHED_VOTES.set_lossy(cached_votes_count);
267282
}
283+
284+
fn should_cache_msg(
285+
&self,
286+
current_height: &BlockNumber,
287+
current_round: Round,
288+
msg_height: BlockNumber,
289+
msg_round: Round,
290+
msg_description: &str,
291+
) -> bool {
292+
let limits = &self.future_msg_limit;
293+
let height_diff = msg_height.saturating_sub(current_height.0);
294+
295+
let should_cache = height_diff <= limits.future_height_limit.into()
296+
// For current height, check against current round + future_round_limit
297+
&& (height_diff == 0 && msg_round <= current_round + limits.future_round_limit
298+
// For future heights, check absolute round limit
299+
|| height_diff > 0 && msg_round <= limits.future_height_round_limit);
300+
301+
if !should_cache {
302+
warn!(
303+
"Dropping {} for height={} round={} when current_height={} current_round={} - \
304+
limits: future_height={}, future_height_round={}, future_round={}",
305+
msg_description,
306+
msg_height,
307+
msg_round,
308+
current_height,
309+
current_round,
310+
limits.future_height_limit,
311+
limits.future_height_round_limit,
312+
limits.future_round_limit
313+
);
314+
}
315+
316+
should_cache
317+
}
318+
319+
fn should_cache_proposal(
320+
&self,
321+
current_height: &BlockNumber,
322+
current_round: Round,
323+
proposal: &ProposalInit,
324+
) -> bool {
325+
self.should_cache_msg(
326+
current_height,
327+
current_round,
328+
proposal.height,
329+
proposal.round,
330+
"proposal",
331+
)
332+
}
333+
334+
fn should_cache_vote(
335+
&self,
336+
current_height: &BlockNumber,
337+
current_round: Round,
338+
vote: &Vote,
339+
) -> bool {
340+
self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote")
341+
}
268342
}
269343

270344
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
@@ -277,8 +351,6 @@ struct MultiHeightManager<ContextT: ConsensusContext> {
277351
// The reason for this Arc<Mutex> we cannot share this instance mutably with
278352
// SingleHeightConsensus despite them not ever using it at the same time in a simpler way, due
279353
// rust limitations.
280-
// TODO(guy.f): Remove in the following PR.
281-
#[allow(dead_code)]
282354
voted_height_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
283355
// Proposal content streams keyed by (height, round)
284356
current_height_proposals_streams:
@@ -298,18 +370,20 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
298370
.expect("Lock should never be poisoned")
299371
.get_prev_voted_height()
300372
.expect("Failed to get previous voted height from storage");
373+
let future_msg_limit = consensus_config.dynamic_config.future_msg_limit;
301374
Self {
302375
consensus_config,
303376
quorum_type,
304377
last_voted_height_at_initialization,
305378
voted_height_storage,
306379
current_height_proposals_streams: BTreeMap::new(),
307-
cache: ConsensusCache::new(),
380+
cache: ConsensusCache::new(future_msg_limit),
308381
}
309382
}
310383

311384
/// Apply the full dynamic consensus configuration. Call only between heights.
312385
pub(crate) fn set_dynamic_config(&mut self, cfg: ConsensusDynamicConfig) {
386+
self.cache.set_future_msg_limit(cfg.future_msg_limit);
313387
self.consensus_config.dynamic_config = cfg;
314388
}
315389

@@ -616,7 +690,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
616690

617691
match proposal_init.height.cmp(&height) {
618692
std::cmp::Ordering::Greater => {
619-
if self.should_cache_proposal(&height, 0, &proposal_init) {
693+
if self.cache.should_cache_proposal(&height, 0, &proposal_init) {
620694
debug!("Received a proposal for a future height. {:?}", proposal_init);
621695
// Note: new proposals with the same height/round will be ignored.
622696
//
@@ -636,7 +710,11 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
636710
}
637711
std::cmp::Ordering::Equal => match shc {
638712
Some(shc) => {
639-
if self.should_cache_proposal(&height, shc.current_round(), &proposal_init) {
713+
if self.cache.should_cache_proposal(
714+
&height,
715+
shc.current_round(),
716+
&proposal_init,
717+
) {
640718
self.handle_proposal_known_init(
641719
context,
642720
height,
@@ -717,7 +795,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
717795
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
718796
match message.height.cmp(&height) {
719797
std::cmp::Ordering::Greater => {
720-
if self.should_cache_vote(&height, 0, &message) {
798+
if self.cache.should_cache_vote(&height, 0, &message) {
721799
trace!("Cache message for a future height. {:?}", message);
722800
self.cache.cache_future_vote(message);
723801
}
@@ -729,7 +807,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
729807
}
730808
std::cmp::Ordering::Equal => match shc {
731809
Some(shc) => {
732-
if self.should_cache_vote(&height, shc.current_round(), &message) {
810+
if self.cache.should_cache_vote(&height, shc.current_round(), &message) {
733811
let leader_fn = make_leader_fn(context, height);
734812
shc.handle_vote(&leader_fn, message)
735813
} else {
@@ -869,65 +947,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
869947
}
870948
}
871949
}
872-
873-
fn should_cache_msg(
874-
&self,
875-
current_height: &BlockNumber,
876-
current_round: Round,
877-
msg_height: BlockNumber,
878-
msg_round: Round,
879-
msg_description: &str,
880-
) -> bool {
881-
let limits = &self.consensus_config.dynamic_config.future_msg_limit;
882-
let height_diff = msg_height.saturating_sub(current_height.0);
883-
884-
let should_cache = height_diff <= limits.future_height_limit.into()
885-
// For current height, check against current round + future_round_limit
886-
&& (height_diff == 0 && msg_round <= current_round + limits.future_round_limit
887-
// For future heights, check absolute round limit
888-
|| height_diff > 0 && msg_round <= limits.future_height_round_limit);
889-
890-
if !should_cache {
891-
warn!(
892-
"Dropping {} for height={} round={} when current_height={} current_round={} - \
893-
limits: future_height={}, future_height_round={}, future_round={}",
894-
msg_description,
895-
msg_height,
896-
msg_round,
897-
current_height,
898-
current_round,
899-
limits.future_height_limit,
900-
limits.future_height_round_limit,
901-
limits.future_round_limit
902-
);
903-
}
904-
905-
should_cache
906-
}
907-
908-
fn should_cache_proposal(
909-
&self,
910-
current_height: &BlockNumber,
911-
current_round: Round,
912-
proposal: &ProposalInit,
913-
) -> bool {
914-
self.should_cache_msg(
915-
current_height,
916-
current_round,
917-
proposal.height,
918-
proposal.round,
919-
"proposal",
920-
)
921-
}
922-
923-
fn should_cache_vote(
924-
&self,
925-
current_height: &BlockNumber,
926-
current_round: Round,
927-
vote: &Vote,
928-
) -> bool {
929-
self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote")
930-
}
931950
}
932951

933952
/// Creates a closure that returns the proposer for a given round at the specified height.

0 commit comments

Comments
 (0)