Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,13 @@ struct ConsensusCache<ContextT: ConsensusContext> {
// Mapping: { Height : Vec<Vote> }
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
// Mapping: { Height : { Round : (Init, Receiver)}}
cached_proposals:
future_proposals_cache:
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
}

impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
fn new() -> Self {
Self { future_votes: BTreeMap::new(), cached_proposals: BTreeMap::new() }
Self { future_votes: BTreeMap::new(), future_proposals_cache: BTreeMap::new() }
}

/// Filters the cached messages:
Expand Down Expand Up @@ -214,7 +214,7 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
height: BlockNumber,
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
loop {
let Some(entry) = self.cached_proposals.first_entry() else {
let Some(entry) = self.future_proposals_cache.first_entry() else {
return Vec::new();
};
match entry.key().cmp(&height) {
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
proposal_init: ProposalInit,
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) {
self.cached_proposals
self.future_proposals_cache
.entry(proposal_init.height)
.or_default()
.entry(proposal_init.round)
Expand All @@ -256,7 +256,7 @@ impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {

fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
// If nothing is cached use current height as "max".
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height);
let max_cached_block_number = self.future_proposals_cache.keys().max().unwrap_or(&height);
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(max_cached_block_number.0);
}

Expand All @@ -281,7 +281,8 @@ struct MultiHeightManager<ContextT: ConsensusContext> {
#[allow(dead_code)]
voted_height_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
// Proposal content streams keyed by (height, round)
proposal_streams: BTreeMap<(BlockNumber, Round), mpsc::Receiver<ContextT::ProposalPart>>,
current_height_proposals_streams:
BTreeMap<(BlockNumber, Round), mpsc::Receiver<ContextT::ProposalPart>>,
cache: ConsensusCache<ContextT>,
}

Expand All @@ -302,7 +303,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
quorum_type,
last_voted_height_at_initialization,
voted_height_storage,
proposal_streams: BTreeMap::new(),
current_height_proposals_streams: BTreeMap::new(),
cache: ConsensusCache::new(),
}
}
Expand Down Expand Up @@ -353,7 +354,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}

// Height completed; clear any content streams associated with current and lower heights.
self.proposal_streams.retain(|(h, _), _| *h > height);
self.current_height_proposals_streams.retain(|(h, _), _| *h > height);

Ok(res)
}
Expand Down Expand Up @@ -665,7 +666,8 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
) -> Result<ShcReturn, ConsensusError> {
// Store the stream; requests will reference it by (height, round)
self.proposal_streams.insert((height, proposal_init.round), content_receiver);
self.current_height_proposals_streams
.insert((height, proposal_init.round), content_receiver);
let leader_fn = make_leader_fn(context, height);
shc.handle_proposal(&leader_fn, proposal_init)
}
Expand Down Expand Up @@ -792,7 +794,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
SMRequest::StartValidateProposal(init) => {
// Look up the stored stream.
let key = (height, init.round);
if let Some(stream) = self.proposal_streams.remove(&key) {
if let Some(stream) = self.current_height_proposals_streams.remove(&key) {
let timeout = timeouts.get_proposal_timeout(init.round);
let receiver = context.validate_proposal(init, timeout, stream).await;
let round = init.round;
Expand Down