Skip to content

Commit fc6df0b

Browse files
apollo_consensus: centralize future vote/proposal caching via ConsensusCache
1 parent c11137e commit fc6df0b

File tree

1 file changed

+110
-81
lines changed

1 file changed

+110
-81
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 110 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,107 @@ pub enum RunHeightRes {
172172

173173
type ProposalReceiverTuple<T> = (ProposalInit, mpsc::Receiver<T>);
174174

175+
/// Manages votes and proposals for future heights.
176+
#[derive(Debug)]
177+
struct ConsensusCache<ContextT: ConsensusContext> {
178+
// Mapping: { Height : Vec<Vote> }
179+
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
180+
// Mapping: { Height : { Round : (Init, Receiver)}}
181+
cached_proposals:
182+
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
183+
}
184+
185+
impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
186+
fn new() -> Self {
187+
Self { future_votes: BTreeMap::new(), cached_proposals: BTreeMap::new() }
188+
}
189+
190+
/// Filters the cached messages:
191+
/// - returns (and removes from stored votes) all of the current height votes.
192+
/// - drops votes from earlier heights.
193+
/// - retains future votes in the cache.
194+
fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec<Vote> {
195+
loop {
196+
let Some(entry) = self.future_votes.first_entry() else {
197+
return Vec::new();
198+
};
199+
match entry.key().cmp(&height) {
200+
std::cmp::Ordering::Greater => return Vec::new(),
201+
std::cmp::Ordering::Equal => return entry.remove(),
202+
std::cmp::Ordering::Less => {
203+
entry.remove();
204+
}
205+
}
206+
}
207+
}
208+
209+
/// Checks if a cached proposal already exists (with correct height)
210+
/// - returns the proposals for the height if they exist and removes them from the cache.
211+
/// - cleans up any proposals from earlier heights.
212+
fn get_current_height_proposals(
213+
&mut self,
214+
height: BlockNumber,
215+
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
216+
loop {
217+
let Some(entry) = self.cached_proposals.first_entry() else {
218+
return Vec::new();
219+
};
220+
match entry.key().cmp(&height) {
221+
std::cmp::Ordering::Greater => return Vec::new(),
222+
std::cmp::Ordering::Equal => {
223+
let round_to_proposals = entry.remove();
224+
return round_to_proposals.into_values().collect();
225+
}
226+
std::cmp::Ordering::Less => {
227+
entry.remove();
228+
}
229+
}
230+
}
231+
}
232+
233+
/// Clears any cached messages for the given height or any lower height.
234+
fn clear_past_and_current_heights(&mut self, height: BlockNumber) {
235+
self.get_current_height_votes(height);
236+
self.get_current_height_proposals(height);
237+
}
238+
239+
/// Caches a vote for a future height.
240+
fn cache_future_vote(&mut self, vote: Vote) {
241+
self.future_votes.entry(BlockNumber(vote.height)).or_default().push(vote);
242+
}
243+
244+
/// Caches a proposal for a future height.
245+
fn cache_future_proposal(
246+
&mut self,
247+
proposal_init: ProposalInit,
248+
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
249+
) {
250+
self.cached_proposals
251+
.entry(proposal_init.height)
252+
.or_default()
253+
.entry(proposal_init.round)
254+
.or_insert((proposal_init, content_receiver));
255+
}
256+
257+
fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
258+
// If nothing is cached use current height as "max".
259+
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height);
260+
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(max_cached_block_number.0);
261+
}
262+
263+
fn report_cached_votes_metric(&self, height: BlockNumber) {
264+
let cached_votes_count =
265+
self.future_votes.get(&height).map(|votes| votes.len()).unwrap_or(0);
266+
CONSENSUS_CACHED_VOTES.set_lossy(cached_votes_count);
267+
}
268+
}
269+
175270
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
176271
/// part of the single height consensus algorithm (e.g. messages from future heights).
177272
#[derive(Debug)]
178273
struct MultiHeightManager<ContextT: ConsensusContext> {
179274
consensus_config: ConsensusConfig,
180-
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
181275
quorum_type: QuorumType,
182-
// Mapping: { Height : { Round : (Init, Receiver)}}
183-
cached_proposals:
184-
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
185276
last_voted_height_at_initialization: Option<BlockNumber>,
186277
// The reason for this Arc<Mutex> we cannot share this instance mutably with
187278
// SingleHeightConsensus despite them not ever using it at the same time in a simpler way, due
@@ -191,6 +282,7 @@ struct MultiHeightManager<ContextT: ConsensusContext> {
191282
voted_height_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
192283
// Proposal content streams keyed by (height, round)
193284
proposal_streams: BTreeMap<(BlockNumber, Round), mpsc::Receiver<ContextT::ProposalPart>>,
285+
cache: ConsensusCache<ContextT>,
194286
}
195287

196288
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
@@ -208,11 +300,10 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
208300
Self {
209301
consensus_config,
210302
quorum_type,
211-
future_votes: BTreeMap::new(),
212-
cached_proposals: BTreeMap::new(),
213303
last_voted_height_at_initialization,
214304
voted_height_storage,
215305
proposal_streams: BTreeMap::new(),
306+
cache: ConsensusCache::new(),
216307
}
217308
}
218309

@@ -246,25 +337,17 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
246337
self.run_height_inner(context, height, broadcast_channels, proposals_receiver).await?;
247338

248339
// Clear any existing votes and proposals for previous heights as well as the current just
249-
// completed height.
250-
//
251-
// Networking layer assumes messages are handled in a timely fashion, otherwise we may build
252-
// up a backlog of useless messages. Similarly we don't want to waste space on old messages.
253-
// This is particularly important when there is a significant lag and we continually finish
254-
// heights immediately due to sync.
255-
256-
// We use get_current_height_votes for its side effect of removing votes for lower
257-
// heights (we don't care about the actual votes).
258-
self.get_current_height_votes(height);
340+
// completed height using the dedicated cache manager.
341+
self.cache.clear_past_and_current_heights(height);
342+
343+
// Clear any votes/proposals that might have arrived *during* the final await/cleanup,
344+
// which still belong to the completed height or lower.
259345
while let Some(message) =
260346
broadcast_channels.broadcasted_messages_receiver.next().now_or_never()
261347
{
262-
// Discard any votes for this heigh or lower by sending a None SHC.
348+
// Discard any votes for this height or lower by sending a None SHC.
263349
self.handle_vote(context, height, None, message, broadcast_channels).await?;
264350
}
265-
// We call this method to filter out any proposals for previous/current heights (we don't
266-
// care about the returned proposals).
267-
self.get_current_height_proposals(height);
268351
while let Ok(content_receiver) = proposals_receiver.try_next() {
269352
self.handle_proposal(context, height, None, content_receiver).await?;
270353
}
@@ -300,7 +383,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
300383
proposals_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
301384
) -> Result<RunHeightRes, ConsensusError> {
302385
CONSENSUS_BLOCK_NUMBER.set_lossy(height.0);
303-
self.report_max_cached_block_number_metric(height);
386+
self.cache.report_max_cached_block_number_metric(height);
304387

305388
// If we already voted for this height, do not proceed until we sync to this height.
306389
// Otherwise, just check if we can sync to this height, immediately. If not, proceed with
@@ -364,7 +447,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
364447
loop {
365448
// Reflect current height/round to context.
366449
context.set_height_and_round(height, shc.current_round()).await;
367-
self.report_max_cached_block_number_metric(height);
450+
self.cache.report_max_cached_block_number_metric(height);
368451
let shc_return = tokio::select! {
369452
message = broadcast_channels.broadcasted_messages_receiver.next() => {
370453
self.handle_vote(context, height, Some(&mut shc), message, broadcast_channels).await?
@@ -415,7 +498,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
415498
height: BlockNumber,
416499
shc: &mut SingleHeightConsensus,
417500
) -> Result<ShcReturn, ConsensusError> {
418-
CONSENSUS_CACHED_VOTES.set_lossy(self.future_votes.entry(height).or_default().len());
501+
self.cache.report_cached_votes_metric(height);
419502
let mut pending_requests = {
420503
let leader_fn = make_leader_fn(context, height);
421504
match shc.start(&leader_fn)? {
@@ -431,7 +514,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
431514
}
432515
};
433516

434-
let cached_proposals = self.get_current_height_proposals(height);
517+
let cached_proposals = self.cache.get_current_height_proposals(height);
435518
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
436519
for (init, content_receiver) in cached_proposals {
437520
match self
@@ -443,7 +526,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
443526
}
444527
}
445528

446-
let cached_votes = self.get_current_height_votes(height);
529+
let cached_votes = self.cache.get_current_height_votes(height);
447530
trace!("Cached votes for height {}: {:?}", height, cached_votes);
448531
for msg in cached_votes {
449532
let leader_fn = make_leader_fn(context, height);
@@ -496,11 +579,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
496579
// "good" nodes can propose).
497580
//
498581
// When moving to version 1.0 make sure this is addressed.
499-
self.cached_proposals
500-
.entry(proposal_init.height)
501-
.or_default()
502-
.entry(proposal_init.round)
503-
.or_insert((proposal_init, content_receiver));
582+
self.cache.cache_future_proposal(proposal_init, content_receiver);
504583
}
505584
Ok(ShcReturn::Requests(VecDeque::new()))
506585
}
@@ -592,7 +671,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
592671
std::cmp::Ordering::Greater => {
593672
if self.should_cache_vote(&height, 0, &message) {
594673
trace!("Cache message for a future height. {:?}", message);
595-
self.future_votes.entry(BlockNumber(message.height)).or_default().push(message);
674+
self.cache.cache_future_vote(message);
596675
}
597676
Ok(ShcReturn::Requests(VecDeque::new()))
598677
}
@@ -748,56 +827,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
748827
}
749828
}
750829

751-
/// Checks if a cached proposal already exists (with correct height)
752-
/// - returns the proposals for the height if they exist and removes them from the cache.
753-
/// - cleans up any proposals from earlier heights.
754-
fn get_current_height_proposals(
755-
&mut self,
756-
height: BlockNumber,
757-
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
758-
loop {
759-
let Some(entry) = self.cached_proposals.first_entry() else {
760-
return Vec::new();
761-
};
762-
match entry.key().cmp(&height) {
763-
std::cmp::Ordering::Greater => return vec![],
764-
std::cmp::Ordering::Equal => {
765-
let round_to_proposals = entry.remove();
766-
return round_to_proposals.into_values().collect();
767-
}
768-
std::cmp::Ordering::Less => {
769-
entry.remove();
770-
}
771-
}
772-
}
773-
}
774-
775-
/// Filters the cached messages:
776-
/// - returns (and removes from stored votes) all of the current height votes.
777-
/// - drops votes from earlier heights.
778-
/// - retains future votes in the cache.
779-
fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec<Vote> {
780-
// Depends on `future_votes` being sorted by height.
781-
loop {
782-
let Some(entry) = self.future_votes.first_entry() else {
783-
return Vec::new();
784-
};
785-
match entry.key().cmp(&height) {
786-
std::cmp::Ordering::Greater => return Vec::new(),
787-
std::cmp::Ordering::Equal => return entry.remove(),
788-
std::cmp::Ordering::Less => {
789-
entry.remove();
790-
}
791-
}
792-
}
793-
}
794-
795-
fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
796-
// If nothing is cached use current height as "max".
797-
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height);
798-
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(max_cached_block_number.0);
799-
}
800-
801830
fn should_cache_msg(
802831
&self,
803832
current_height: &BlockNumber,

0 commit comments

Comments
 (0)