Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions crates/apollo_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ use crate::votes_threshold::QuorumType;
/// blocking further calls to itself.
#[derive(Debug, PartialEq)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcReturn {
pub(crate) enum ShcReturn {
Tasks(Vec<ShcTask>),
Decision(Decision),
}

/// A task which should be run without blocking calls to SHC.
#[derive(Debug)]
#[cfg_attr(test, derive(EnumAsInner))]
pub enum ShcTask {
pub(crate) enum ShcTask {
TimeoutPropose(Duration, StateMachineEvent),
TimeoutPrevote(Duration, StateMachineEvent),
TimeoutPrecommit(Duration, StateMachineEvent),
Expand Down Expand Up @@ -95,7 +95,7 @@ impl PartialEq for ShcTask {
}

impl ShcTask {
pub async fn run(self) -> StateMachineEvent {
pub(crate) async fn run(self) -> StateMachineEvent {
trace!("Running task: {:?}", self);
match self {
ShcTask::TimeoutPropose(duration, event)
Expand Down Expand Up @@ -135,7 +135,6 @@ impl ShcTask {
pub(crate) struct SingleHeightConsensus {
height: BlockNumber,
validators: Vec<ValidatorId>,
id: ValidatorId,
timeouts: TimeoutsConfig,
state_machine: StateMachine,
proposals: HashMap<Round, Option<ProposalCommitment>>,
Expand All @@ -161,7 +160,6 @@ impl SingleHeightConsensus {
Self {
height,
validators,
id,
timeouts,
state_machine,
proposals: HashMap::new(),
Expand Down Expand Up @@ -230,7 +228,7 @@ impl SingleHeightConsensus {
}

#[instrument(skip_all)]
pub async fn handle_event<ContextT: ConsensusContext>(
pub(crate) async fn handle_event<ContextT: ConsensusContext>(
&mut self,
context: &mut ContextT,
event: StateMachineEvent,
Expand Down Expand Up @@ -467,8 +465,12 @@ impl SingleHeightConsensus {

// TODO(Matan): Figure out how to handle failed proposal building. I believe this should be
// handled by applying timeoutPropose when we are the leader.
let init =
ProposalInit { height: self.height, round, proposer: self.id, valid_round: None };
let init = ProposalInit {
height: self.height,
round,
proposer: self.state_machine.validator_id(),
valid_round: None,
};
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
// TODO(Asmaa): Reconsider: we should keep the builder's timeout bounded independently of
// the consensus proposal timeout. We currently use the base (round 0) proposal
Expand Down Expand Up @@ -506,7 +508,7 @@ impl SingleHeightConsensus {
let init = ProposalInit {
height: self.height,
round,
proposer: self.id,
proposer: self.state_machine.validator_id(),
valid_round: Some(valid_round),
};
CONSENSUS_REPROPOSALS.increment(1);
Expand Down Expand Up @@ -542,9 +544,9 @@ impl SingleHeightConsensus {
height: self.height.0,
round,
proposal_commitment: proposal_id,
voter: self.id,
voter: self.state_machine.validator_id(),
};
if let Some(old) = votes.insert((round, self.id), vote.clone()) {
if let Some(old) = votes.insert((round, self.state_machine.validator_id()), vote.clone()) {
return Err(ConsensusError::InternalInconsistency(format!(
"State machine should not send repeat votes: old={old:?}, new={vote:?}"
)));
Expand Down
36 changes: 20 additions & 16 deletions crates/apollo_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::votes_threshold::{QuorumType, VotesThreshold, ROUND_SKIP_THRESHOLD};

/// Events which the state machine sends/receives.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum StateMachineEvent {
pub(crate) enum StateMachineEvent {
/// Sent by the state machine when a block is required to propose (ProposalCommitment is always
/// None). While waiting for the response of GetProposal, the state machine will buffer all
/// other events. The caller *must* respond with a valid proposal id for this height to the
Expand All @@ -53,7 +53,7 @@ pub enum StateMachineEvent {
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum Step {
pub(crate) enum Step {
Propose,
Prevote,
Precommit,
Expand All @@ -65,7 +65,7 @@ pub enum Step {
///
/// Each height is begun with a call to `start`, with no further calls to it.
#[derive(Serialize, Deserialize)]
pub struct StateMachine {
pub(crate) struct StateMachine {
id: ValidatorId,
round: Round,
step: Step,
Expand All @@ -91,7 +91,7 @@ pub struct StateMachine {

impl StateMachine {
/// total_weight - the total voting weight of all validators for this height.
pub fn new(
pub(crate) fn new(
id: ValidatorId,
total_weight: u64,
is_observer: bool,
Expand Down Expand Up @@ -120,22 +120,26 @@ impl StateMachine {
}
}

pub fn round(&self) -> Round {
pub(crate) fn round(&self) -> Round {
self.round
}

pub fn total_weight(&self) -> u64 {
pub(crate) fn total_weight(&self) -> u64 {
self.total_weight
}

pub fn quorum(&self) -> &VotesThreshold {
pub(crate) fn quorum(&self) -> &VotesThreshold {
&self.quorum
}

pub(crate) fn validator_id(&self) -> ValidatorId {
self.id
}

/// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is
/// needed to trigger the first leader to propose.
/// See [`GetProposal`](StateMachineEvent::GetProposal)
pub fn start<LeaderFn>(&mut self, leader_fn: &LeaderFn) -> VecDeque<StateMachineEvent>
pub(crate) fn start<LeaderFn>(&mut self, leader_fn: &LeaderFn) -> VecDeque<StateMachineEvent>
where
LeaderFn: Fn(Round) -> ValidatorId,
{
Expand All @@ -151,7 +155,7 @@ impl StateMachine {
/// events back to the state machine, as it makes sure to handle them before returning.
// This means that the StateMachine handles events the same regardless of whether it was sent by
// self or a peer. This is in line with the Algorithm 1 in the paper and keeps the code simpler.
pub fn handle_event<LeaderFn>(
pub(crate) fn handle_event<LeaderFn>(
&mut self,
event: StateMachineEvent,
leader_fn: &LeaderFn,
Expand All @@ -178,7 +182,7 @@ impl StateMachine {
self.handle_enqueued_events(leader_fn)
}

fn handle_enqueued_events<LeaderFn>(
pub(crate) fn handle_enqueued_events<LeaderFn>(
&mut self,
leader_fn: &LeaderFn,
) -> VecDeque<StateMachineEvent>
Expand Down Expand Up @@ -219,7 +223,7 @@ impl StateMachine {
output_events
}

fn handle_event_internal<LeaderFn>(
pub(crate) fn handle_event_internal<LeaderFn>(
&mut self,
event: StateMachineEvent,
leader_fn: &LeaderFn,
Expand Down Expand Up @@ -258,7 +262,7 @@ impl StateMachine {
}
}

fn handle_get_proposal(
pub(crate) fn handle_get_proposal(
&mut self,
proposal_id: Option<ProposalCommitment>,
round: u32,
Expand All @@ -271,7 +275,7 @@ impl StateMachine {
}

// A proposal from a peer (or self) node.
fn handle_proposal<LeaderFn>(
pub(crate) fn handle_proposal<LeaderFn>(
&mut self,
proposal_id: Option<ProposalCommitment>,
round: u32,
Expand All @@ -286,7 +290,7 @@ impl StateMachine {
self.map_round_to_upons(round, leader_fn)
}

fn handle_timeout_propose(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
pub(crate) fn handle_timeout_propose(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
if self.step != Step::Propose || round != self.round {
return VecDeque::new();
};
Expand All @@ -301,7 +305,7 @@ impl StateMachine {
}

// A prevote from a peer (or self) node.
fn handle_prevote<LeaderFn>(
pub(crate) fn handle_prevote<LeaderFn>(
&mut self,
proposal_id: Option<ProposalCommitment>,
round: u32,
Expand All @@ -316,7 +320,7 @@ impl StateMachine {
self.map_round_to_upons(round, leader_fn)
}

fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque<StateMachineEvent> {
if self.step != Step::Prevote || round != self.round {
return VecDeque::new();
};
Expand Down