Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 116 additions & 69 deletions crates/apollo_consensus/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,39 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
CONSENSUS_BLOCK_NUMBER.set_lossy(height.0);
self.cache.report_max_cached_block_number_metric(height);

if let Some(sync_result) = self.check_and_wait_for_sync(context, height).await? {
return Ok(sync_result);
}

let (mut shc, mut shc_events) =
self.initialize_single_height_consensus(context, height).await;

if let Some(decision) = self
.process_start_height(context, height, &mut shc, &mut shc_events, broadcast_channels)
.await?
{
error!("Decision reached before executing requests. {:?}", decision);
return Ok(RunHeightRes::Decision(decision));
}

self.process_consensus_events(
context,
height,
&mut shc,
&mut shc_events,
broadcast_channels,
proposals_receiver,
)
.await
}

/// Check if we need to sync and wait if necessary.
/// Returns Some(RunHeightRes::Sync) if sync is needed, None otherwise.
async fn check_and_wait_for_sync(
&mut self,
context: &mut ContextT,
height: BlockNumber,
) -> Result<Option<RunHeightRes>, ConsensusError> {
// If we already voted for this height, do not proceed until we sync to this height.
// Otherwise, just check if we can sync to this height, immediately. If not, proceed with
// consensus.
Expand All @@ -400,61 +433,122 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
self.last_voted_height_at_initialization.unwrap().0
);
self.wait_until_sync_reaches_height(height, context).await;
return Ok(RunHeightRes::Sync);
return Ok(Some(RunHeightRes::Sync));
} else if context.try_sync(height).await {
return Ok(RunHeightRes::Sync);
return Ok(Some(RunHeightRes::Sync));
}
Ok(None)
}

/// Initialize consensus for a height: get validators, create SHC, and set up events.
async fn initialize_single_height_consensus(
&mut self,
context: &mut ContextT,
height: BlockNumber,
) -> (SingleHeightConsensus, FuturesUnordered<BoxFuture<'static, StateMachineEvent>>) {
let validators = context.validators(height).await;
let is_observer = !validators.contains(&self.consensus_config.dynamic_config.validator_id);
info!(
"START_HEIGHT: running consensus for height {:?}. is_observer: {}, validators: {:?}",
height, is_observer, validators,
);

let mut shc = SingleHeightConsensus::new(
let shc = SingleHeightConsensus::new(
height,
is_observer,
self.consensus_config.dynamic_config.validator_id,
validators,
self.quorum_type,
self.consensus_config.dynamic_config.timeouts.clone(),
);
let mut shc_events = FuturesUnordered::new();
let shc_events = FuturesUnordered::new();

(shc, shc_events)
}

/// Process the start of a height: call shc.start, process cached proposals/votes, and execute
/// initial requests. Returns Some(Decision) if a decision was reached (with error logging),
/// None otherwise.
async fn process_start_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
broadcast_channels: &mut BroadcastVoteChannel,
) -> Result<Option<Decision>, ConsensusError> {
self.cache.report_cached_votes_metric(height);
let mut pending_requests = {
let leader_fn = make_leader_fn(context, height);
match shc.start(&leader_fn)? {
ShcReturn::Decision(decision) => {
// Start should generate either StartValidateProposal (validator) or
// StartBuildProposal (proposer). We do not enforce this
// since the Manager is intentionally not meant to
// understand consensus in detail.
return Ok(Some(decision));
}
ShcReturn::Requests(requests) => requests,
}
};

match self.start_height(context, height, &mut shc).await? {
ShcReturn::Decision(decision) => {
return Ok(RunHeightRes::Decision(decision));
let cached_proposals = self.cache.get_current_height_proposals(height);
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
for (init, content_receiver) in cached_proposals {
match self
.handle_proposal_known_init(context, height, shc, init, content_receiver)
.await?
{
ShcReturn::Decision(decision) => {
return Ok(Some(decision));
}
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
}
ShcReturn::Requests(requests) => {
// Reflect initial height/round to context before executing requests.
context.set_height_and_round(height, shc.current_round()).await;
self.execute_requests(
context,
height,
requests,
&mut shc_events,
broadcast_channels,
)
.await?;
}

let cached_votes = self.cache.get_current_height_votes(height);
trace!("Cached votes for height {}: {:?}", height, cached_votes);
for msg in cached_votes {
let leader_fn = make_leader_fn(context, height);
match shc.handle_vote(&leader_fn, msg)? {
ShcReturn::Decision(decision) => {
return Ok(Some(decision));
}
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
}
}

// Loop over incoming proposals, messages, and self generated events.
// Reflect initial height/round to context before executing requests.
context.set_height_and_round(height, shc.current_round()).await;
self.execute_requests(context, height, pending_requests, shc_events, broadcast_channels)
.await?;
Ok(None)
}

/// Main consensus loop: handles incoming proposals, votes, events, and sync checks.
async fn process_consensus_events(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
shc_events: &mut FuturesUnordered<BoxFuture<'static, StateMachineEvent>>,
broadcast_channels: &mut BroadcastVoteChannel,
proposals_receiver: &mut mpsc::Receiver<mpsc::Receiver<ContextT::ProposalPart>>,
) -> Result<RunHeightRes, ConsensusError> {
let clock = DefaultClock;
let sync_retry_interval = self.consensus_config.dynamic_config.sync_retry_interval;
let mut sync_poll_deadline = clock.now() + sync_retry_interval;
loop {
self.cache.report_max_cached_block_number_metric(height);
let shc_return = tokio::select! {
message = broadcast_channels.broadcasted_messages_receiver.next() => {
self.handle_vote(context, height, Some(&mut shc), message, broadcast_channels).await?
self.handle_vote(context, height, Some(shc), message, broadcast_channels).await?
},
content_receiver = proposals_receiver.next() => {
self.handle_proposal(
context,
height,
Some(&mut shc),
Some(shc),
content_receiver
)
.await?
Expand Down Expand Up @@ -482,7 +576,7 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
context,
height,
requests,
&mut shc_events,
shc_events,
broadcast_channels,
)
.await?;
Expand All @@ -491,53 +585,6 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
}
}

async fn start_height(
&mut self,
context: &mut ContextT,
height: BlockNumber,
shc: &mut SingleHeightConsensus,
) -> Result<ShcReturn, ConsensusError> {
self.cache.report_cached_votes_metric(height);
let mut pending_requests = {
let leader_fn = make_leader_fn(context, height);
match shc.start(&leader_fn)? {
decision @ ShcReturn::Decision(_) => {
// Start should generate either StartValidateProposal (validator) or
// StartBuildProposal (proposer). We do not enforce this
// since the Manager is intentionally not meant to
// understand consensus in detail.
error!("Decision reached at start of height. {:?}", decision);
return Ok(decision);
}
ShcReturn::Requests(requests) => requests,
}
};

let cached_proposals = self.cache.get_current_height_proposals(height);
trace!("Cached proposals for height {}: {:?}", height, cached_proposals);
for (init, content_receiver) in cached_proposals {
match self
.handle_proposal_known_init(context, height, shc, init, content_receiver)
.await?
{
decision @ ShcReturn::Decision(_) => return Ok(decision),
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
}
}

let cached_votes = self.cache.get_current_height_votes(height);
trace!("Cached votes for height {}: {:?}", height, cached_votes);
for msg in cached_votes {
let leader_fn = make_leader_fn(context, height);
match shc.handle_vote(&leader_fn, msg)? {
decision @ ShcReturn::Decision(_) => return Ok(decision),
ShcReturn::Requests(new_requests) => pending_requests.extend(new_requests),
}
}

Ok(ShcReturn::Requests(pending_requests))
}

// Handle a new proposal receiver from the network.
// shc - None if the height was just completed and we should drop the message.
async fn handle_proposal(
Expand Down