Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 100 additions & 68 deletions crates/apollo_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,82 +148,114 @@ impl SingleHeightConsensus {
LeaderFn: Fn(Round) -> ValidatorId,
{
trace!("Received StateMachineEvent: {:?}", event);
let ret = match event {
StateMachineEvent::TimeoutPropose(_round)
| StateMachineEvent::TimeoutPrevote(_round)
| StateMachineEvent::TimeoutPrecommit(_round) => {
let sm_requests = self.state_machine.handle_event(event, leader_fn);
self.handle_state_machine_requests(sm_requests)
}
StateMachineEvent::VoteBroadcasted(vote) => {
let last_vote = match vote.vote_type {
VoteType::Prevote => {
self.state_machine.last_self_prevote().ok_or_else(|| {
ConsensusError::InternalInconsistency("No prevote to send".to_string())
})?
}
VoteType::Precommit => {
self.state_machine.last_self_precommit().ok_or_else(|| {
ConsensusError::InternalInconsistency(
"No precommit to send".to_string(),
)
})?
}
};
if last_vote.round > vote.round {
// Only rebroadcast the newest vote.
return Ok(ShcReturn::Requests(VecDeque::new()));
}
assert_eq!(last_vote, vote);
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
}
match event {
StateMachineEvent::TimeoutPropose(_)
| StateMachineEvent::TimeoutPrevote(_)
| StateMachineEvent::TimeoutPrecommit(_) => self.handle_timeout_event(leader_fn, event),
StateMachineEvent::VoteBroadcasted(vote) => self.handle_vote_broadcasted(vote),
StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => {
debug!(
proposer = %leader_fn(round),
%round,
?valid_round,
proposal_commitment = ?proposal_id,
node_round = self.state_machine.round(),
"Validated proposal.",
);
if proposal_id.is_some() {
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
} else {
CONSENSUS_PROPOSALS_INVALID.increment(1);
}

// Cleanup: validation for round {round} finished, so remove it from the pending
// set. This doesn't affect logic.
self.pending_validation_rounds.remove(&round);
let requests = self.state_machine.handle_event(event, leader_fn);
self.handle_state_machine_requests(requests)
self.handle_finished_validation(leader_fn, proposal_id, round, valid_round)
}
StateMachineEvent::FinishedBuilding(proposal_id, round) => {
if proposal_id.is_none() {
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
}
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
// Ensure SM has no proposal recorded yet for this round when proposing.
assert!(
!self.state_machine.has_proposal_for_round(round),
"There should be no entry for round {round} when proposing"
);

assert_eq!(
round,
self.state_machine.round(),
"State machine should not progress while awaiting proposal"
);
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
let requests = self.state_machine.handle_event(event, &leader_fn);
self.handle_state_machine_requests(requests)
self.handle_finished_building(leader_fn, proposal_id, round)
}
StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => {
unreachable!("Peer votes must be handled via handle_vote")
}
}
}

fn handle_timeout_event<LeaderFn>(
&mut self,
leader_fn: &LeaderFn,
event: StateMachineEvent,
) -> Result<ShcReturn, ConsensusError>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
let sm_requests = self.state_machine.handle_event(event, leader_fn);
self.handle_state_machine_requests(sm_requests)
}

fn handle_vote_broadcasted(&mut self, vote: Vote) -> Result<ShcReturn, ConsensusError> {
let last_vote = match vote.vote_type {
VoteType::Prevote => self.state_machine.last_self_prevote().ok_or_else(|| {
ConsensusError::InternalInconsistency("No prevote to send".to_string())
})?,
VoteType::Precommit => self.state_machine.last_self_precommit().ok_or_else(|| {
ConsensusError::InternalInconsistency("No precommit to send".to_string())
})?,
};
ret
if last_vote.round > vote.round {
// Only rebroadcast the newest vote.
return Ok(ShcReturn::Requests(VecDeque::new()));
}
assert_eq!(last_vote, vote);
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
Ok(ShcReturn::Requests(VecDeque::from([SMRequest::BroadcastVote(last_vote)])))
}

fn handle_finished_validation<LeaderFn>(
&mut self,
leader_fn: &LeaderFn,
proposal_id: Option<ProposalCommitment>,
round: Round,
valid_round: Option<Round>,
) -> Result<ShcReturn, ConsensusError>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
debug!(
proposer = %leader_fn(round),
%round,
?valid_round,
proposal_commitment = ?proposal_id,
node_round = self.state_machine.round(),
"Validated proposal.",
);
if proposal_id.is_some() {
CONSENSUS_PROPOSALS_VALIDATED.increment(1);
} else {
CONSENSUS_PROPOSALS_INVALID.increment(1);
}
// Cleanup: validation for round {round} finished, so remove it from the pending
// set. This doesn't affect logic.
self.pending_validation_rounds.remove(&round);
let requests = self.state_machine.handle_event(
StateMachineEvent::FinishedValidation(proposal_id, round, None),
leader_fn,
);
self.handle_state_machine_requests(requests)
}

fn handle_finished_building<LeaderFn>(
&mut self,
leader_fn: &LeaderFn,
proposal_id: Option<ProposalCommitment>,
round: Round,
) -> Result<ShcReturn, ConsensusError>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
if proposal_id.is_none() {
CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1);
}
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
// Ensure SM has no proposal recorded yet for this round when proposing.
assert!(
!self.state_machine.has_proposal_for_round(round),
"There should be no entry for round {round} when proposing"
);
assert_eq!(
round,
self.state_machine.round(),
"State machine should not progress while awaiting proposal"
);
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
let requests = self
.state_machine
.handle_event(StateMachineEvent::FinishedBuilding(proposal_id, round), leader_fn);
self.handle_state_machine_requests(requests)
}

/// Handle vote messages from peer nodes.
Expand Down
Loading