Skip to content

Commit 1bc4d62

Browse files
apollo_consensus: centralize future vote/proposal caching via ConsensusCache
1 parent cc1b52c commit 1bc4d62

File tree

1 file changed

+110
-81
lines changed

1 file changed

+110
-81
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 110 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -172,16 +172,107 @@ pub enum RunHeightRes {
172172

173173
type ProposalReceiverTuple<T> = (ProposalInit, mpsc::Receiver<T>);
174174

175+
/// Manages votes and proposals for future heights.
176+
#[derive(Debug)]
177+
struct ConsensusCache<ContextT: ConsensusContext> {
178+
// Mapping: { Height : Vec<Vote> }
179+
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
180+
// Mapping: { Height : { Round : (Init, Receiver)}}
181+
cached_proposals:
182+
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
183+
}
184+
185+
impl<ContextT: ConsensusContext> ConsensusCache<ContextT> {
186+
fn new() -> Self {
187+
Self { future_votes: BTreeMap::new(), cached_proposals: BTreeMap::new() }
188+
}
189+
190+
/// Filters the cached messages:
191+
/// - returns (and removes from stored votes) all of the current height votes.
192+
/// - drops votes from earlier heights.
193+
/// - retains future votes in the cache.
194+
fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec<Vote> {
195+
loop {
196+
let Some(entry) = self.future_votes.first_entry() else {
197+
return Vec::new();
198+
};
199+
match entry.key().cmp(&height) {
200+
std::cmp::Ordering::Greater => return Vec::new(),
201+
std::cmp::Ordering::Equal => return entry.remove(),
202+
std::cmp::Ordering::Less => {
203+
entry.remove();
204+
}
205+
}
206+
}
207+
}
208+
209+
/// Checks if a cached proposal already exists (with correct height)
210+
/// - returns the proposals for the height if they exist and removes them from the cache.
211+
/// - cleans up any proposals from earlier heights.
212+
fn get_current_height_proposals(
213+
&mut self,
214+
height: BlockNumber,
215+
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
216+
loop {
217+
let Some(entry) = self.cached_proposals.first_entry() else {
218+
return Vec::new();
219+
};
220+
match entry.key().cmp(&height) {
221+
std::cmp::Ordering::Greater => return Vec::new(),
222+
std::cmp::Ordering::Equal => {
223+
let round_to_proposals = entry.remove();
224+
return round_to_proposals.into_values().collect();
225+
}
226+
std::cmp::Ordering::Less => {
227+
entry.remove();
228+
}
229+
}
230+
}
231+
}
232+
233+
/// Clears any cached messages for the given height or any lower height.
234+
fn clear_past_and_current_heights(&mut self, height: BlockNumber) {
235+
self.get_current_height_votes(height);
236+
self.get_current_height_proposals(height);
237+
}
238+
239+
/// Caches a vote for a future height.
240+
fn cache_future_vote(&mut self, vote: Vote) {
241+
self.future_votes.entry(BlockNumber(vote.height)).or_default().push(vote);
242+
}
243+
244+
/// Caches a proposal for a future height.
245+
fn cache_future_proposal(
246+
&mut self,
247+
proposal_init: ProposalInit,
248+
content_receiver: mpsc::Receiver<ContextT::ProposalPart>,
249+
) {
250+
self.cached_proposals
251+
.entry(proposal_init.height)
252+
.or_default()
253+
.entry(proposal_init.round)
254+
.or_insert((proposal_init, content_receiver));
255+
}
256+
257+
fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
258+
// If nothing is cached use current height as "max".
259+
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height);
260+
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(max_cached_block_number.0);
261+
}
262+
263+
fn report_cached_votes_metric(&self, height: BlockNumber) {
264+
let cached_votes_count =
265+
self.future_votes.get(&height).map(|votes| votes.len()).unwrap_or(0);
266+
CONSENSUS_CACHED_VOTES.set_lossy(cached_votes_count);
267+
}
268+
}
269+
175270
/// Runs Tendermint repeatedly across different heights. Handles issues which are not explicitly
176271
/// part of the single height consensus algorithm (e.g. messages from future heights).
177272
#[derive(Debug)]
178273
struct MultiHeightManager<ContextT: ConsensusContext> {
179274
consensus_config: ConsensusConfig,
180-
future_votes: BTreeMap<BlockNumber, Vec<Vote>>,
181275
quorum_type: QuorumType,
182-
// Mapping: { Height : { Round : (Init, Receiver)}}
183-
cached_proposals:
184-
BTreeMap<BlockNumber, BTreeMap<Round, ProposalReceiverTuple<ContextT::ProposalPart>>>,
185276
last_voted_height_at_initialization: Option<BlockNumber>,
186277
// The reason for this Arc<Mutex> we cannot share this instance mutably with
187278
// SingleHeightConsensus despite them not ever using it at the same time in a simpler way, due
@@ -191,6 +282,7 @@ struct MultiHeightManager<ContextT: ConsensusContext> {
191282
voted_height_storage: Arc<Mutex<dyn HeightVotedStorageTrait>>,
192283
// Proposal content streams keyed by (height, round)
193284
proposal_streams: BTreeMap<(BlockNumber, Round), mpsc::Receiver<ContextT::ProposalPart>>,
285+
cache: ConsensusCache<ContextT>,
194286
}
195287

196288
impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
@@ -208,11 +300,10 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
208300
Self {
209301
consensus_config,
210302
quorum_type,
211-
future_votes: BTreeMap::new(),
212-
cached_proposals: BTreeMap::new(),
213303
last_voted_height_at_initialization,
214304
voted_height_storage,
215305
proposal_streams: BTreeMap::new(),
306+
cache: ConsensusCache::new(),
216307
}
217308
}
218309

@@ -246,25 +337,17 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
246337
self.run_height_inner(context, height, broadcast_channels, proposals_receiver).await?;
247338

248339
// Clear any existing votes and proposals for previous heights as well as the current just
249-
// completed height.
250-
//
251-
// Networking layer assumes messages are handled in a timely fashion, otherwise we may build
252-
// up a backlog of useless messages. Similarly we don't want to waste space on old messages.
253-
// This is particularly important when there is a significant lag and we continually finish
254-
// heights immediately due to sync.
255-
256-
// We use get_current_height_votes for its side effect of removing votes for lower
257-
// heights (we don't care about the actual votes).
258-
self.get_current_height_votes(height);
340+
// completed height using the dedicated cache manager.
341+
self.cache.clear_past_and_current_heights(height);
342+
343+
// Clear any votes/proposals that might have arrived *during* the final await/cleanup,
344+
// which still belong to the completed height or lower.
259345
while let Some(message) =
260346
broadcast_channels.broadcasted_messages_receiver.next().now_or_never()
261347
{
262-
// Discard any votes for this heigh or lower by sending a None SHC.
348+
// Discard any votes for this height or lower by sending a None SHC.
263349
self.handle_vote(context, height, None, message, broadcast_channels).await?;
264350
}
265-
// We call this method to filter out any proposals for previous/current heights (we don't
266-
// care about the returned proposals).
267-
self.get_current_height_proposals(height);
268351
while let Ok(content_receiver) = proposals_receiver.try_next() {
269352
self.handle_proposal(context, height, None, content_receiver).await?;
270353
}
@@ -300,7 +383,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
300383
proposals_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
301384
) -> Result<RunHeightRes, ConsensusError> {
302385
CONSENSUS_BLOCK_NUMBER.set_lossy(height.0);
303-
self.report_max_cached_block_number_metric(height);
386+
self.cache.report_max_cached_block_number_metric(height);
304387

305388
// If we already voted for this height, do not proceed until we sync to this height.
306389
// Otherwise, just check if we can sync to this height, immediately. If not, proceed with
@@ -364,7 +447,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
364447
loop {
365448
// Reflect current height/round to context.
366449
context.set_height_and_round(height, shc.current_round()).await;
367-
self.report_max_cached_block_number_metric(height);
450+
self.cache.report_max_cached_block_number_metric(height);
368451
let shc_return = tokio::select! {
369452
message = broadcast_channels.broadcasted_messages_receiver.next() => {
370453
self.handle_vote(context, height, Some(&mut shc), message, broadcast_channels).await?
@@ -415,7 +498,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
415498
height: BlockNumber,
416499
shc: &mut SingleHeightConsensus,
417500
) -> Result<ShcReturn, ConsensusError> {
418-
CONSENSUS_CACHED_VOTES.set_lossy(self.future_votes.entry(height).or_default().len());
501+
self.cache.report_cached_votes_metric(height);
419502
let mut pending_requests = {
420503
let leader_fn = make_leader_fn(context, height);
421504
match shc.start(&leader_fn)? {
@@ -430,7 +513,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
430513
}
431514
};
432515

433-
let cached_proposals = self.get_current_height_proposals(height);
516+
let cached_proposals = self.cache.get_current_height_proposals(height);
434517
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
435518
for (init, content_receiver) in cached_proposals {
436519
match self
@@ -442,7 +525,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
442525
}
443526
}
444527

445-
let cached_votes = self.get_current_height_votes(height);
528+
let cached_votes = self.cache.get_current_height_votes(height);
446529
trace!("Cached votes for height {}: {:?}", height, cached_votes);
447530
for msg in cached_votes {
448531
let leader_fn = make_leader_fn(context, height);
@@ -495,11 +578,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
495578
// "good" nodes can propose).
496579
//
497580
// When moving to version 1.0 make sure this is addressed.
498-
self.cached_proposals
499-
.entry(proposal_init.height)
500-
.or_default()
501-
.entry(proposal_init.round)
502-
.or_insert((proposal_init, content_receiver));
581+
self.cache.cache_future_proposal(proposal_init, content_receiver);
503582
}
504583
Ok(ShcReturn::Requests(VecDeque::new()))
505584
}
@@ -591,7 +670,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
591670
std::cmp::Ordering::Greater => {
592671
if self.should_cache_vote(&height, 0, &message) {
593672
trace!("Cache message for a future height. {:?}", message);
594-
self.future_votes.entry(BlockNumber(message.height)).or_default().push(message);
673+
self.cache.cache_future_vote(message);
595674
}
596675
Ok(ShcReturn::Requests(VecDeque::new()))
597676
}
@@ -747,56 +826,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
747826
}
748827
}
749828

750-
/// Checks if a cached proposal already exists (with correct height)
751-
/// - returns the proposals for the height if they exist and removes them from the cache.
752-
/// - cleans up any proposals from earlier heights.
753-
fn get_current_height_proposals(
754-
&mut self,
755-
height: BlockNumber,
756-
) -> Vec<(ProposalInit, mpsc::Receiver<ContextT::ProposalPart>)> {
757-
loop {
758-
let Some(entry) = self.cached_proposals.first_entry() else {
759-
return Vec::new();
760-
};
761-
match entry.key().cmp(&height) {
762-
std::cmp::Ordering::Greater => return vec![],
763-
std::cmp::Ordering::Equal => {
764-
let round_to_proposals = entry.remove();
765-
return round_to_proposals.into_values().collect();
766-
}
767-
std::cmp::Ordering::Less => {
768-
entry.remove();
769-
}
770-
}
771-
}
772-
}
773-
774-
/// Filters the cached messages:
775-
/// - returns (and removes from stored votes) all of the current height votes.
776-
/// - drops votes from earlier heights.
777-
/// - retains future votes in the cache.
778-
fn get_current_height_votes(&mut self, height: BlockNumber) -> Vec<Vote> {
779-
// Depends on `future_votes` being sorted by height.
780-
loop {
781-
let Some(entry) = self.future_votes.first_entry() else {
782-
return Vec::new();
783-
};
784-
match entry.key().cmp(&height) {
785-
std::cmp::Ordering::Greater => return Vec::new(),
786-
std::cmp::Ordering::Equal => return entry.remove(),
787-
std::cmp::Ordering::Less => {
788-
entry.remove();
789-
}
790-
}
791-
}
792-
}
793-
794-
fn report_max_cached_block_number_metric(&self, height: BlockNumber) {
795-
// If nothing is cached use current height as "max".
796-
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height);
797-
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(max_cached_block_number.0);
798-
}
799-
800829
fn should_cache_msg(
801830
&self,
802831
current_height: &BlockNumber,

0 commit comments

Comments
 (0)