Skip to content

Commit ff4fef8

Browse files
apollo_consensus: extract run_height_inner into focused helper funcs
1 parent ec18b70 commit ff4fef8

File tree

1 file changed

+116
-69
lines changed

1 file changed

+116
-69
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 116 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,39 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
385385
CONSENSUS_BLOCK_NUMBER.set_lossy(height.0);
386386
self.cache.report_max_cached_block_number_metric(height);
387387

388+
if let Some(sync_result) = self.check_and_wait_for_sync(context, height).await? {
389+
return Ok(sync_result);
390+
}
391+
392+
let (mut shc, mut shc_events) =
393+
self.initialize_single_height_consensus(context, height).await;
394+
395+
if let Some(decision) = self
396+
.process_start_height(context, height, &mut shc, &mut shc_events, broadcast_channels)
397+
.await?
398+
{
399+
error!("Decision reached before executing requests. {:?}", decision);
400+
return Ok(RunHeightRes::Decision(decision));
401+
}
402+
403+
self.process_consensus_events(
404+
context,
405+
height,
406+
&mut shc,
407+
&mut shc_events,
408+
broadcast_channels,
409+
proposals_receiver,
410+
)
411+
.await
412+
}
413+
414+
/// Check if we need to sync and wait if necessary.
415+
/// Returns Some(RunHeightRes::Sync) if sync is needed, None otherwise.
416+
async fn check_and_wait_for_sync(
417+
&mut self,
418+
context: &mut ContextT,
419+
height: BlockNumber,
420+
) -> Result<Option<RunHeightRes>, ConsensusError> {
388421
// If we already voted for this height, do not proceed until we sync to this height.
389422
// Otherwise, just check if we can sync to this height, immediately. If not, proceed with
390423
// consensus.
@@ -400,61 +433,122 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
400433
self.last_voted_height_at_initialization.unwrap().0
401434
);
402435
self.wait_until_sync_reaches_height(height, context).await;
403-
return Ok(RunHeightRes::Sync);
436+
return Ok(Some(RunHeightRes::Sync));
404437
} else if context.try_sync(height).await {
405-
return Ok(RunHeightRes::Sync);
438+
return Ok(Some(RunHeightRes::Sync));
406439
}
440+
Ok(None)
441+
}
407442

443+
/// Initialize consensus for a height: get validators, create SHC, and set up events.
444+
async fn initialize_single_height_consensus(
445+
&mut self,
446+
context: &mut ContextT,
447+
height: BlockNumber,
448+
) -> (SingleHeightConsensus, FuturesUnordered<BoxFuture<'static, StateMachineEvent>>) {
408449
let validators = context.validators(height).await;
409450
let is_observer = !validators.contains(&self.consensus_config.dynamic_config.validator_id);
410451
info!(
411452
"START_HEIGHT: running consensus for height {:?}. is_observer: {}, validators: {:?}",
412453
height, is_observer, validators,
413454
);
414455

415-
let mut shc = SingleHeightConsensus::new(
456+
let shc = SingleHeightConsensus::new(
416457
height,
417458
is_observer,
418459
self.consensus_config.dynamic_config.validator_id,
419460
validators,
420461
self.quorum_type,
421462
self.consensus_config.dynamic_config.timeouts.clone(),
422463
);
423-
let mut shc_events = FuturesUnordered::new();
464+
let shc_events = FuturesUnordered::new();
465+
466+
(shc, shc_events)
467+
}
468+
469+
/// Process the start of a height: call shc.start, process cached proposals/votes, and execute
470+
/// initial requests. Returns Some(Decision) if a decision was reached (with error logging),
471+
/// None otherwise.
472+
async fn process_start_height(
473+
&mut self,
474+
context: &mut ContextT,
475+
height: BlockNumber,
476+
shc: &mut SingleHeightConsensus,
477+
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
478+
broadcast_channels: &mut BroadcastVoteChannel,
479+
) -> Result<Option<Decision>, ConsensusError> {
480+
self.cache.report_cached_votes_metric(height);
481+
let mut pending_requests = {
482+
let leader_fn = make_leader_fn(context, height);
483+
match shc.start(&leader_fn)? {
484+
ShcReturn::Decision(decision) => {
485+
// Start should generate either StartValidateProposal (validator) or
486+
// StartBuildProposal (proposer). We do not enforce this
487+
// since the Manager is intentionally not meant to
488+
// understand consensus in detail.
489+
return Ok(Some(decision));
490+
}
491+
ShcReturn::Requests(requests) => requests,
492+
}
493+
};
424494

425-
match self.start_height(context, height, &mut shc).await? {
426-
ShcReturn::Decision(decision) => {
427-
return Ok(RunHeightRes::Decision(decision));
495+
let cached_proposals = self.cache.get_current_height_proposals(height);
496+
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
497+
for (init, content_receiver) in cached_proposals {
498+
match self
499+
.handle_proposal_known_init(context, height, shc, init, content_receiver)
500+
.await?
501+
{
502+
ShcReturn::Decision(decision) => {
503+
return Ok(Some(decision));
504+
}
505+
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
428506
}
429-
ShcReturn::Requests(requests) => {
430-
// Reflect initial height/round to context before executing requests.
431-
context.set_height_and_round(height, shc.current_round()).await;
432-
self.execute_requests(
433-
context,
434-
height,
435-
requests,
436-
&mut shc_events,
437-
broadcast_channels,
438-
)
439-
.await?;
507+
}
508+
509+
let cached_votes = self.cache.get_current_height_votes(height);
510+
trace!("Cached votes for height {}: {:?}", height, cached_votes);
511+
for msg in cached_votes {
512+
let leader_fn = make_leader_fn(context, height);
513+
match shc.handle_vote(&leader_fn, msg)? {
514+
ShcReturn::Decision(decision) => {
515+
return Ok(Some(decision));
516+
}
517+
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
440518
}
441519
}
442520

443-
// Loop over incoming proposals, messages, and self generated events.
521+
// Reflect initial height/round to context before executing requests.
522+
context.set_height_and_round(height, shc.current_round()).await;
523+
self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels)
524+
.await?;
525+
Ok(None)
526+
}
527+
528+
/// Main consensus loop: handles incoming proposals, votes, events, and sync checks.
529+
async fn process_consensus_events(
530+
&mut self,
531+
context: &mut ContextT,
532+
height: BlockNumber,
533+
shc: &mut SingleHeightConsensus,
534+
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
535+
broadcast_channels: &mut BroadcastVoteChannel,
536+
proposals_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
537+
) -> Result<RunHeightRes, ConsensusError> {
444538
let clock = DefaultClock;
445539
let sync_retry_interval = self.consensus_config.dynamic_config.sync_retry_interval;
446540
let mut sync_poll_deadline = clock.now() + sync_retry_interval;
447541
loop {
448542
self.cache.report_max_cached_block_number_metric(height);
449543
let shc_return = tokio::select! {
450544
message = broadcast_channels.broadcasted_messages_receiver.next() => {
451-
self.handle_vote(context, height, Some(&mut shc), message, broadcast_channels).await?
545+
self.handle_vote(context, height, Some(shc), message, broadcast_channels).await?
452546
},
453547
content_receiver = proposals_receiver.next() => {
454548
self.handle_proposal(
455549
context,
456550
height,
457-
Some(&mut shc),
551+
Some(shc),
458552
content_receiver
459553
)
460554
.await?
@@ -482,7 +576,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
482576
context,
483577
height,
484578
requests,
485-
&mut shc_events,
579+
shc_events,
486580
broadcast_channels,
487581
)
488582
.await?;
@@ -491,53 +585,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
491585
}
492586
}
493587

494-
async fn start_height(
495-
&mut self,
496-
context: &mut ContextT,
497-
height: BlockNumber,
498-
shc: &mut SingleHeightConsensus,
499-
) -> Result<ShcReturn, ConsensusError> {
500-
self.cache.report_cached_votes_metric(height);
501-
let mut pending_requests = {
502-
let leader_fn = make_leader_fn(context, height);
503-
match shc.start(&leader_fn)? {
504-
decision @ ShcReturn::Decision(_) => {
505-
// Start should generate either StartValidateProposal (validator) or
506-
// StartBuildProposal (proposer). We do not enforce this
507-
// since the Manager is intentionally not meant to
508-
// understand consensus in detail.
509-
error!("Decision reached at start of height. {:?}", decision);
510-
return Ok(decision);
511-
}
512-
ShcReturn::Requests(requests) => requests,
513-
}
514-
};
515-
516-
let cached_proposals = self.cache.get_current_height_proposals(height);
517-
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
518-
for (init, content_receiver) in cached_proposals {
519-
match self
520-
.handle_proposal_known_init(context, height, shc, init, content_receiver)
521-
.await?
522-
{
523-
decision @ ShcReturn::Decision(_) => return Ok(decision),
524-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
525-
}
526-
}
527-
528-
let cached_votes = self.cache.get_current_height_votes(height);
529-
trace!("Cached votes for height {}: {:?}", height, cached_votes);
530-
for msg in cached_votes {
531-
let leader_fn = make_leader_fn(context, height);
532-
match shc.handle_vote(&leader_fn, msg)? {
533-
decision @ ShcReturn::Decision(_) => return Ok(decision),
534-
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
535-
}
536-
}
537-
538-
Ok(ShcReturn::Requests(pending_requests))
539-
}
540-
541588
// Handle a new proposal receiver from the network.
542589
// shc - None if the height was just completed and we should drop the message.
543590
async fn handle_proposal(

0 commit comments

Comments
 (0)