diff --git a/crates/apollo_consensus/src/single_height_consensus.rs b/crates/apollo_consensus/src/single_height_consensus.rs index dbcbbe0f767..33e9d91a602 100644 --- a/crates/apollo_consensus/src/single_height_consensus.rs +++ b/crates/apollo_consensus/src/single_height_consensus.rs @@ -32,7 +32,7 @@ use crate::metrics::{ CONSENSUS_PROPOSALS_VALID_INIT, CONSENSUS_REPROPOSALS, }; -use crate::state_machine::{StateMachine, StateMachineEvent}; +use crate::state_machine::{SMRequest, StateMachine, StateMachineEvent}; use crate::storage::HeightVotedStorageTrait; use crate::types::{ ConsensusContext, @@ -60,8 +60,8 @@ pub(crate) enum ShcTask { TimeoutPropose(Duration, StateMachineEvent), TimeoutPrevote(Duration, StateMachineEvent), TimeoutPrecommit(Duration, StateMachineEvent), - Prevote(Duration, StateMachineEvent), - Precommit(Duration, StateMachineEvent), + /// Periodic rebroadcast of the latest self vote of the given type. + Rebroadcast(Duration, Vote), /// Building a proposal is handled in 3 stages: /// 1. The SHC requests a block to be built from the context. /// 2. SHC returns, allowing the context to build the block while the Manager awaits the result @@ -85,9 +85,10 @@ impl PartialEq for ShcTask { match (self, other) { (ShcTask::TimeoutPropose(d1, e1), ShcTask::TimeoutPropose(d2, e2)) | (ShcTask::TimeoutPrevote(d1, e1), ShcTask::TimeoutPrevote(d2, e2)) - | (ShcTask::TimeoutPrecommit(d1, e1), ShcTask::TimeoutPrecommit(d2, e2)) - | (ShcTask::Prevote(d1, e1), ShcTask::Prevote(d2, e2)) - | (ShcTask::Precommit(d1, e1), ShcTask::Precommit(d2, e2)) => d1 == d2 && e1 == e2, + | (ShcTask::TimeoutPrecommit(d1, e1), ShcTask::TimeoutPrecommit(d2, e2)) => { + d1 == d2 && e1 == e2 + } + (ShcTask::Rebroadcast(d1, v1), ShcTask::Rebroadcast(d2, v2)) => d1 == d2 && v1 == v2, (ShcTask::BuildProposal(r1, _), ShcTask::BuildProposal(r2, _)) => r1 == r2, (ShcTask::ValidateProposal(pi1, _), ShcTask::ValidateProposal(pi2, _)) => pi1 == pi2, _ => false, @@ -101,21 +102,23 @@ impl ShcTask { match self { ShcTask::TimeoutPropose(duration, event) | ShcTask::TimeoutPrevote(duration, event) - | ShcTask::TimeoutPrecommit(duration, event) - | ShcTask::Prevote(duration, event) - | ShcTask::Precommit(duration, event) => { + | ShcTask::TimeoutPrecommit(duration, event) => { tokio::time::sleep(duration).await; event } + ShcTask::Rebroadcast(duration, vote) => { + tokio::time::sleep(duration).await; + StateMachineEvent::RebroadcastVote(vote) + } ShcTask::BuildProposal(round, receiver) => { let proposal_id = receiver.await.ok(); - StateMachineEvent::GetProposal(proposal_id, round) + StateMachineEvent::FinishedBuilding(proposal_id, round) } ShcTask::ValidateProposal(init, block_receiver) => { // TODO(Asmaa): Consider if we want to differentiate between an interrupt and other // failures. let proposal_id = block_receiver.await.ok(); - StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round) + StateMachineEvent::FinishedValidation(proposal_id, init.round, init.valid_round) } } } @@ -177,8 +180,8 @@ impl SingleHeightConsensus { let height = self.state_machine.height(); context.set_height_and_round(height, self.state_machine.round()).await; let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; - let events = self.state_machine.start(&leader_fn); - let ret = self.handle_state_machine_events(context, events).await; + let requests = self.state_machine.start(&leader_fn); + let ret = self.handle_state_machine_requests(context, requests).await; // Defensive programming. We don't expect the height and round to have changed from the // start of this method. context.set_height_and_round(height, self.state_machine.round()).await; @@ -236,47 +239,51 @@ impl SingleHeightConsensus { event: StateMachineEvent, ) -> Result { trace!("Received StateMachineEvent: {:?}", event); + let height = self.state_machine.height(); + let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; let ret = match event { StateMachineEvent::TimeoutPropose(_round) | StateMachineEvent::TimeoutPrevote(_round) | StateMachineEvent::TimeoutPrecommit(_round) => { self.handle_timeout(context, event).await } - StateMachineEvent::Prevote(vote) => { - let Some(last_vote) = self.state_machine.last_self_prevote() else { - return Err(ConsensusError::InternalInconsistency( - "No prevote to send".to_string(), - )); - }; - if last_vote.round > vote.round { - // Only replay the newest prevote. - return Ok(ShcReturn::Tasks(Vec::new())); + StateMachineEvent::RebroadcastVote(vote) => match vote.vote_type { + VoteType::Prevote => { + let Some(last_vote) = self.state_machine.last_self_prevote() else { + return Err(ConsensusError::InternalInconsistency( + "No prevote to send".to_string(), + )); + }; + if last_vote.round > vote.round { + // Only replay the newest prevote. + return Ok(ShcReturn::Tasks(Vec::new())); + } + trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); + context.broadcast(last_vote.clone()).await?; + Ok(ShcReturn::Tasks(vec![ShcTask::Rebroadcast( + self.timeouts.get_prevote_timeout(0), + vote, + )])) } - trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); - context.broadcast(last_vote.clone()).await?; - Ok(ShcReturn::Tasks(vec![ShcTask::Prevote( - self.timeouts.get_prevote_timeout(0), - StateMachineEvent::Prevote(vote), - )])) - } - StateMachineEvent::Precommit(vote) => { - let Some(last_vote) = self.state_machine.last_self_precommit() else { - return Err(ConsensusError::InternalInconsistency( - "No precommit to send".to_string(), - )); - }; - if last_vote.round > vote.round { - // Only replay the newest precommit. - return Ok(ShcReturn::Tasks(Vec::new())); + VoteType::Precommit => { + let Some(last_vote) = self.state_machine.last_self_precommit() else { + return Err(ConsensusError::InternalInconsistency( + "No precommit to send".to_string(), + )); + }; + if last_vote.round > vote.round { + // Only replay the newest precommit. + return Ok(ShcReturn::Tasks(Vec::new())); + } + trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); + context.broadcast(last_vote.clone()).await?; + Ok(ShcReturn::Tasks(vec![ShcTask::Rebroadcast( + self.timeouts.get_precommit_timeout(0), + vote, + )])) } - trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}"); - context.broadcast(last_vote.clone()).await?; - Ok(ShcReturn::Tasks(vec![ShcTask::Precommit( - self.timeouts.get_precommit_timeout(0), - StateMachineEvent::Precommit(vote), - )])) - } - StateMachineEvent::Proposal(proposal_id, round, valid_round) => { + }, + StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => { let height = self.state_machine.height(); let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; debug!( @@ -296,10 +303,10 @@ impl SingleHeightConsensus { // Cleanup: validation for round {round} finished, so remove it from the pending // set. This doesn't affect logic. self.pending_validation_rounds.remove(&round); - let sm_events = self.state_machine.handle_event(event, &leader_fn); - self.handle_state_machine_events(context, sm_events).await + let requests = self.state_machine.handle_event(event, &leader_fn); + self.handle_state_machine_requests(context, requests).await } - StateMachineEvent::GetProposal(proposal_id, round) => { + StateMachineEvent::FinishedBuilding(proposal_id, round) => { if proposal_id.is_none() { CONSENSUS_BUILD_PROPOSAL_FAILED.increment(1); } @@ -315,12 +322,12 @@ impl SingleHeightConsensus { "State machine should not progress while awaiting proposal" ); debug!(%round, proposal_commitment = ?proposal_id, "Built proposal."); - let height = self.state_machine.height(); - let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; - let sm_events = self.state_machine.handle_event(event, &leader_fn); - self.handle_state_machine_events(context, sm_events).await + let requests = self.state_machine.handle_event(event, &leader_fn); + self.handle_state_machine_requests(context, requests).await + } + StateMachineEvent::Prevote(_) | StateMachineEvent::Precommit(_) => { + unreachable!("Peer votes must be handled via handle_vote") } - _ => unimplemented!("Unexpected event: {:?}", event), }; context.set_height_and_round(self.state_machine.height(), self.state_machine.round()).await; ret @@ -333,8 +340,8 @@ impl SingleHeightConsensus { ) -> Result { let height = self.state_machine.height(); let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; - let sm_events = self.state_machine.handle_event(event, &leader_fn); - self.handle_state_machine_events(context, sm_events).await + let sm_requests = self.state_machine.handle_event(event, &leader_fn); + self.handle_state_machine_requests(context, sm_requests).await } /// Handle vote messages from peer nodes. @@ -374,158 +381,118 @@ impl SingleHeightConsensus { info!("Accepting {:?}", vote); let height = self.state_machine.height(); let leader_fn = |round: Round| -> ValidatorId { context.proposer(height, round) }; - let sm_events = self.state_machine.handle_event(sm_vote, &leader_fn); - let ret = self.handle_state_machine_events(context, sm_events).await; + // TODO(Asmaa): consider calling handle_prevote/precommit instead of sending the vote event. + let requests = self.state_machine.handle_event(sm_vote, &leader_fn); + let ret = self.handle_state_machine_requests(context, requests).await; context.set_height_and_round(height, self.state_machine.round()).await; ret } - // Handle events output by the state machine. - async fn handle_state_machine_events( + // Handle requests output by the state machine. + async fn handle_state_machine_requests( &mut self, context: &mut ContextT, - mut events: VecDeque, + mut requests: VecDeque, ) -> Result { let mut ret_val = Vec::new(); - while let Some(event) = events.pop_front() { - trace!("Handling sm event: {:?}", event); - match event { - StateMachineEvent::GetProposal(proposal_id, round) => { - ret_val.extend( - self.handle_state_machine_get_proposal(context, proposal_id, round).await, + while let Some(request) = requests.pop_front() { + trace!("Handling sm request: {:?}", request); + match request { + SMRequest::StartBuildProposal(round) => { + // Ensure SM has no proposal recorded yet for this round when proposing. + assert!( + !self.state_machine.has_proposal_for_round(round), + "There should be no entry for round {round} when proposing" ); + // TODO(Matan): Figure out how to handle failed proposal building. I believe + // this should be handled by applying timeoutPropose when we + // are the leader. + let init = ProposalInit { + height: self.state_machine.height(), + round, + proposer: self.state_machine.validator_id(), + valid_round: None, + }; + CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1); + // TODO(Asmaa): Reconsider: we should keep the builder's timeout bounded + // independently of the consensus proposal timeout. We currently use the base + // (round 0) proposal timeout for building to avoid giving the Batcher more time + // when proposal time is extended for consensus. + let fin_receiver = + context.build_proposal(init, self.timeouts.get_proposal_timeout(0)).await; + ret_val.push(ShcTask::BuildProposal(round, fin_receiver)); } - StateMachineEvent::Proposal(proposal_id, round, valid_round) => { - self.handle_state_machine_proposal(context, proposal_id, round, valid_round) - .await; + SMRequest::BroadcastVote(vote) => { + let rebroadcast_task = match vote.vote_type { + VoteType::Prevote => { + ShcTask::Rebroadcast(self.timeouts.get_prevote_timeout(0), vote.clone()) + } + VoteType::Precommit => ShcTask::Rebroadcast( + self.timeouts.get_precommit_timeout(0), + vote.clone(), + ), + }; + // Ensure the voter matches this node. + assert_eq!(vote.voter, self.state_machine.validator_id()); + trace!("Writing voted height {} to storage", self.state_machine.height()); + self.height_voted_storage + .lock() + .expect( + "Lock should never be poisoned because there should never be \ + concurrent access.", + ) + .set_prev_voted_height(self.state_machine.height()) + .expect("Failed to write voted height {self.height} to storage"); + + info!("Broadcasting {vote:?}"); + context.broadcast(vote).await?; + ret_val.push(rebroadcast_task); } - StateMachineEvent::Decision(proposal_id, round) => { - return self.handle_state_machine_decision(proposal_id, round).await; - } - StateMachineEvent::Prevote(vote) => { - ret_val.extend(self.handle_state_machine_vote(context, vote).await?); - } - StateMachineEvent::Precommit(vote) => { - ret_val.extend(self.handle_state_machine_vote(context, vote).await?); - } - StateMachineEvent::TimeoutPropose(round) => { + SMRequest::ScheduleTimeoutPropose(round) => { ret_val.push(ShcTask::TimeoutPropose( self.timeouts.get_proposal_timeout(round), - event, + StateMachineEvent::TimeoutPropose(round), )); } - StateMachineEvent::TimeoutPrevote(round) => { + SMRequest::ScheduleTimeoutPrevote(round) => { ret_val.push(ShcTask::TimeoutPrevote( self.timeouts.get_prevote_timeout(round), - event, + StateMachineEvent::TimeoutPrevote(round), )); } - StateMachineEvent::TimeoutPrecommit(round) => { + SMRequest::ScheduleTimeoutPrecommit(round) => { ret_val.push(ShcTask::TimeoutPrecommit( self.timeouts.get_precommit_timeout(round), - event, + StateMachineEvent::TimeoutPrecommit(round), )); } + SMRequest::DecisionReached(proposal_id, round) => { + return self.handle_state_machine_decision(proposal_id, round).await; + } + SMRequest::Repropose(proposal_id, init) => { + // Make sure there is an existing proposal for the valid round and it matches + // the proposal ID. + let Some(valid_round) = init.valid_round else { + // Newly built proposals are handled by the BuildProposal flow. + continue; + }; + let existing = self.state_machine.proposal_id_for_round(valid_round); + assert!( + existing.is_some_and(|id| id == proposal_id), + "A proposal with ID {proposal_id:?} should exist for valid_round: \ + {valid_round}. Found: {existing:?}", + ); + CONSENSUS_REPROPOSALS.increment(1); + context.repropose(proposal_id, init).await; + } + SMRequest::StartValidateProposal(_) => { + unimplemented!("StartValidateProposal is not supported.") + } } } Ok(ShcReturn::Tasks(ret_val)) } - /// Initiate block building. See [`ShcTask::BuildProposal`] for more details on the full - /// proposal flow. - async fn handle_state_machine_get_proposal( - &mut self, - context: &mut ContextT, - proposal_id: Option, - round: Round, - ) -> Vec { - assert!( - proposal_id.is_none(), - "StateMachine is requesting a new proposal, but provided a content id." - ); - - // TODO(Matan): Figure out how to handle failed proposal building. I believe this should be - // handled by applying timeoutPropose when we are the leader. - let init = ProposalInit { - height: self.state_machine.height(), - round, - proposer: self.state_machine.validator_id(), - valid_round: None, - }; - CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1); - // TODO(Asmaa): Reconsider: we should keep the builder's timeout bounded independently of - // the consensus proposal timeout. We currently use the base (round 0) proposal - // timeout for building to avoid giving the Batcher more time when proposal time is - // extended for consensus. - let build_timeout = self.timeouts.get_proposal_timeout(0); - let fin_receiver = context.build_proposal(init, build_timeout).await; - vec![ShcTask::BuildProposal(round, fin_receiver)] - } - - async fn handle_state_machine_proposal( - &mut self, - context: &mut ContextT, - proposal_id: Option, - round: Round, - valid_round: Option, - ) { - let Some(valid_round) = valid_round else { - // Newly built proposals are handled by the BuildProposal flow. - return; - }; - let proposal_id = proposal_id.expect("Reproposal must have a valid ID"); - - // Make sure there is an existing proposal for the valid round and it matches the proposal - // ID. - let existing = self.state_machine.proposal_id_for_round(valid_round); - assert!( - existing.is_some_and(|id| id == proposal_id), - "A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \ - Found: {existing:?}", - ); - - let init = ProposalInit { - height: self.state_machine.height(), - round, - proposer: self.state_machine.validator_id(), - valid_round: Some(valid_round), - }; - CONSENSUS_REPROPOSALS.increment(1); - context.repropose(proposal_id, init).await; - } - - async fn handle_state_machine_vote( - &mut self, - context: &mut ContextT, - vote: Vote, - ) -> Result, ConsensusError> { - let task = match vote.vote_type { - VoteType::Prevote => ShcTask::Prevote( - self.timeouts.get_prevote_timeout(0), - StateMachineEvent::Prevote(vote.clone()), - ), - VoteType::Precommit => ShcTask::Precommit( - self.timeouts.get_precommit_timeout(0), - StateMachineEvent::Precommit(vote.clone()), - ), - }; - // Ensure the voter matches this node. - assert_eq!(vote.voter, self.state_machine.validator_id()); - - trace!("Writing voted height {} to storage", self.state_machine.height()); - self.height_voted_storage - .lock() - .expect( - "Lock should never be poisoned because there should never be concurrent access.", - ) - .set_prev_voted_height(self.state_machine.height()) - .expect("Failed to write voted height {self.height} to storage"); - - info!("Broadcasting {vote:?}"); - context.broadcast(vote).await?; - Ok(vec![task]) - } - async fn handle_state_machine_decision( &mut self, proposal_id: ProposalCommitment, diff --git a/crates/apollo_consensus/src/single_height_consensus_test.rs b/crates/apollo_consensus/src/single_height_consensus_test.rs index f5a5cc9db29..5839c885cc4 100644 --- a/crates/apollo_consensus/src/single_height_consensus_test.rs +++ b/crates/apollo_consensus/src/single_height_consensus_test.rs @@ -39,7 +39,7 @@ lazy_static! { static ref PROPOSAL_INIT: ProposalInit = ProposalInit { proposer: *PROPOSER_ID, ..Default::default() }; static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); - static ref VALIDATE_PROPOSAL_EVENT: StateMachineEvent = StateMachineEvent::Proposal( + static ref VALIDATE_PROPOSAL_EVENT: StateMachineEvent = StateMachineEvent::FinishedValidation( Some(BLOCK.id), PROPOSAL_INIT.round, PROPOSAL_INIT.valid_round, @@ -53,25 +53,24 @@ fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit { ProposalInit { height, ..*PROPOSAL_INIT } } -fn prevote_task(block_felt: Option, height: u64, round: u32, voter: ValidatorId) -> ShcTask { +fn rebroadcast_prevote_task( + block_felt: Option, + height: u64, + round: u32, + voter: ValidatorId, +) -> ShcTask { let duration = TIMEOUTS.get_prevote_timeout(0); - ShcTask::Prevote( - duration, - StateMachineEvent::Prevote(prevote(block_felt, height, round, voter)), - ) + ShcTask::Rebroadcast(duration, prevote(block_felt, height, round, voter)) } -fn precommit_task( +fn rebroadcast_precommit_task( block_felt: Option, height: u64, round: u32, voter: ValidatorId, ) -> ShcTask { let duration = TIMEOUTS.get_precommit_timeout(0); - ShcTask::Precommit( - duration, - StateMachineEvent::Precommit(precommit(block_felt, height, round, voter)), - ) + ShcTask::Rebroadcast(duration, precommit(block_felt, height, round, voter)) } fn timeout_prevote_task(round: u32) -> ShcTask { @@ -131,8 +130,9 @@ async fn proposer() { let shc_ret = shc.start(&mut context).await.unwrap(); assert_eq!(*shc_ret.as_tasks().unwrap()[0].as_build_proposal().unwrap().0, 0); assert_eq!( - shc.handle_event(&mut context, StateMachineEvent::GetProposal(Some(BLOCK.id), 0)).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)])) + shc.handle_event(&mut context, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) + .await, + Ok(ShcReturn::Tasks(vec![rebroadcast_prevote_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)])) ); assert_eq!( shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, @@ -149,7 +149,7 @@ async fn proposer() { shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), - precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID) + rebroadcast_precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID) ])) ); @@ -215,7 +215,12 @@ async fn validator(repeat_proposal: bool) { ); assert_eq!( shc.handle_event(&mut context, VALIDATE_PROPOSAL_EVENT.clone()).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1)])) + Ok(ShcReturn::Tasks(vec![rebroadcast_prevote_task( + Some(BLOCK.id.0), + HEIGHT.0, + 0, + *VALIDATOR_ID_1 + )])) ); if repeat_proposal { // Send the same proposal again, which should be ignored (no expectations). @@ -238,7 +243,7 @@ async fn validator(repeat_proposal: bool) { .await, Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), - precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1) + rebroadcast_precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1) ])) ); @@ -297,7 +302,12 @@ async fn vote_twice(same_vote: bool) { ); assert_eq!( shc.handle_event(&mut context, VALIDATE_PROPOSAL_EVENT.clone()).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1)])) + Ok(ShcReturn::Tasks(vec![rebroadcast_prevote_task( + Some(BLOCK.id.0), + HEIGHT.0, + 0, + *VALIDATOR_ID_1 + )])) ); let res = @@ -317,7 +327,7 @@ async fn vote_twice(same_vote: bool) { res, Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), - precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1), + rebroadcast_precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1), ])) ); @@ -373,8 +383,9 @@ async fn rebroadcast_votes() { let shc_ret = shc.start(&mut context).await.unwrap(); assert_eq!(*shc_ret.as_tasks().unwrap()[0].as_build_proposal().unwrap().0, 0); assert_eq!( - shc.handle_event(&mut context, StateMachineEvent::GetProposal(Some(BLOCK.id), 0)).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)])) + shc.handle_event(&mut context, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) + .await, + Ok(ShcReturn::Tasks(vec![rebroadcast_prevote_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)])) ); assert_eq!( shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await, @@ -392,7 +403,7 @@ async fn rebroadcast_votes() { Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), // initiate timeout for rebroadcast at round 0. - precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID), + rebroadcast_precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID), ])) ); // Advance to the next round with NIL precommits. @@ -423,32 +434,30 @@ async fn rebroadcast_votes() { Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(1), // initiate timeout for rebroadcast at round 1. - precommit_task(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID), + rebroadcast_precommit_task(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID), ])) ); - // Re-broadcast with older vote (round 0) - should be ignored (no broadcast, no task). assert_eq!( shc.handle_event( &mut context, - StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) + StateMachineEvent::RebroadcastVote(precommit(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID)) ) .await, Ok(ShcReturn::Tasks(Vec::new())) ); - // Re-broadcast with current round (round 1) - should broadcast and schedule another timeout for // rebroadcast at round 1. assert_eq!( shc.handle_event( &mut context, - StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)) + StateMachineEvent::RebroadcastVote(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)) ) .await, - Ok(ShcReturn::Tasks(vec![ShcTask::Precommit( - TIMEOUTS.get_precommit_timeout(0), - StateMachineEvent::Precommit(precommit(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID)), - )])) + Ok(ShcReturn::Tasks(vec![ + // initiate timeout for rebroadcast at round 1. + rebroadcast_precommit_task(Some(BLOCK.id.0), 0, 1, *PROPOSER_ID), + ])) ); } @@ -480,7 +489,7 @@ async fn repropose() { .returning(move |_| Ok(())); // Sends proposal and prevote. shc.start(&mut context).await.unwrap(); - shc.handle_event(&mut context, StateMachineEvent::GetProposal(Some(BLOCK.id), 0)) + shc.handle_event(&mut context, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) .await .unwrap(); shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_1)).await.unwrap(); @@ -494,7 +503,7 @@ async fn repropose() { shc.handle_vote(&mut context, prevote(Some(BLOCK.id.0), 0, 0, *VALIDATOR_ID_2)).await, Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), - precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID), + rebroadcast_precommit_task(Some(BLOCK.id.0), 0, 0, *PROPOSER_ID), ])) ); // Advance to the next round. @@ -584,7 +593,12 @@ async fn writes_voted_height_to_storage() { ); assert_eq!( shc.handle_event(&mut context, VALIDATE_PROPOSAL_EVENT.clone()).await, - Ok(ShcReturn::Tasks(vec![prevote_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1)])) + Ok(ShcReturn::Tasks(vec![rebroadcast_prevote_task( + Some(BLOCK.id.0), + HEIGHT.0, + 0, + *VALIDATOR_ID_1 + )])) ); // This is the call that will result in the prevote broadcast and storage write. @@ -615,7 +629,7 @@ async fn writes_voted_height_to_storage() { .await, Ok(ShcReturn::Tasks(vec![ timeout_prevote_task(0), - precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1) + rebroadcast_precommit_task(Some(BLOCK.id.0), HEIGHT.0, 0, *VALIDATOR_ID_1) ])) ); diff --git a/crates/apollo_consensus/src/state_machine.rs b/crates/apollo_consensus/src/state_machine.rs index 0fffd043d86..010a0ee0723 100644 --- a/crates/apollo_consensus/src/state_machine.rs +++ b/crates/apollo_consensus/src/state_machine.rs @@ -9,7 +9,7 @@ mod state_machine_test; use std::collections::{HashMap, HashSet, VecDeque}; -use apollo_protobuf::consensus::{Vote, VoteType}; +use apollo_protobuf::consensus::{ProposalInit, Vote, VoteType}; use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tracing::{debug, info, trace, warn}; @@ -36,31 +36,49 @@ type WeightedVote = (Vote, u32); /// A map of votes, keyed by round and validator ID, with the vote and its weight. type VotesMap = HashMap; -/// Events which the state machine sends/receives. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +/// Events which the state machine receives. These represent completion events +/// fed back to the SM after an external task is done. +#[derive(Debug, Clone, PartialEq)] pub(crate) enum StateMachineEvent { - /// Sent by the state machine when a block is required to propose (ProposalCommitment is always - /// None). While waiting for the response of GetProposal, the state machine will buffer all - /// other events. The caller *must* respond with a valid proposal id for this height to the - /// state machine, and the same round sent out. - GetProposal(Option, Round), - /// Consensus message, can be both sent from and to the state machine. - // (proposal_id, round, valid_round) - Proposal(Option, Round, Option), - /// Consensus message, can be both sent from and to the state machine. + /// The local proposal building task has completed. + FinishedBuilding(Option, Round), + /// A proposal validation task has completed. (proposal_id, round, valid_round) + FinishedValidation(Option, Round, Option), + /// Prevote message, sent from the SHC to the state machine. Prevote(Vote), - /// Consensus message, can be both sent from and to the state machine. + /// Precommit message, sent from the SHC to the state machine. Precommit(Vote), - /// The state machine returns this event to the caller when a decision is reached. Not - /// expected as an inbound message. We presume that the caller is able to recover the set of - /// precommits which led to this decision from the information returned here. - Decision(ProposalCommitment, Round), - /// Timeout events, can be both sent from and to the state machine. + /// TimeoutPropose event, sent from the state machine to the SHC. TimeoutPropose(Round), - /// Timeout events, can be both sent from and to the state machine. + /// TimeoutPrevote event, sent from the state machine to the SHC. TimeoutPrevote(Round), - /// Timeout events, can be both sent from and to the state machine. + /// TimeoutPrecommit event, sent from the state machine to the SHC. TimeoutPrecommit(Round), + /// Used by the SHC to rebroadcast a self-vote. + RebroadcastVote(Vote), +} + +/// Requests the SM/SHC sends to the caller for execution. +#[derive(Debug, Clone, PartialEq)] +pub(crate) enum SMRequest { + /// Request to build a proposal for a new round. + StartBuildProposal(Round), + /// Request to validate a received proposal from the network. + #[allow(dead_code)] + StartValidateProposal(ProposalInit), + /// Request to broadcast a Prevote or Precommit vote. + BroadcastVote(Vote), + /// Request to schedule a TimeoutPropose. + ScheduleTimeoutPropose(Round), + /// Request to schedule a TimeoutPrevote. + ScheduleTimeoutPrevote(Round), + /// Request to schedule a TimeoutPrecommit. + ScheduleTimeoutPrecommit(Round), + /// Decision reached for the given proposal and round. + DecisionReached(ProposalCommitment, Round), + /// Request to re-propose (sent by the leader after advancing to a new round + /// with a locked/valid value). + Repropose(ProposalCommitment, ProposalInit), } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] @@ -75,7 +93,6 @@ pub(crate) enum Step { /// 2. SM must handle "out of order" messages (E.g. vote arrives before proposal). /// /// Each height is begun with a call to `start`, with no further calls to it. -#[derive(Serialize, Deserialize)] pub(crate) struct StateMachine { height: BlockNumber, id: ValidatorId, @@ -90,9 +107,9 @@ pub(crate) struct StateMachine { // {(round, voter): (vote, weight)} prevotes: VotesMap, precommits: VotesMap, - // When true, the state machine will wait for a GetProposal event, buffering all other input - // events in `events_queue`. - awaiting_get_proposal: bool, + // When true, the state machine will wait for a FinishedBuilding event, buffering all other + // input events in `events_queue`. + awaiting_finished_building: bool, events_queue: VecDeque, locked_value_round: Option<(ProposalCommitment, Round)>, valid_value_round: Option<(ProposalCommitment, Round)>, @@ -127,7 +144,7 @@ impl StateMachine { proposals: HashMap::new(), prevotes: HashMap::new(), precommits: HashMap::new(), - awaiting_get_proposal: false, + awaiting_finished_building: false, events_queue: VecDeque::new(), locked_value_round: None, valid_value_round: None, @@ -187,7 +204,7 @@ impl StateMachine { &mut self, vote_type: VoteType, proposal_commitment: Option, - ) -> Vote { + ) -> VecDeque { let vote = Vote { vote_type, height: self.height.0, @@ -195,23 +212,36 @@ impl StateMachine { proposal_commitment, voter: self.id, }; - // update the latest self vote. - let last_self_vote = match vote_type { - VoteType::Prevote => &mut self.last_self_prevote, - VoteType::Precommit => &mut self.last_self_precommit, + let mut output = VecDeque::new(); + // Only non-observers record and track self-votes. + if self.is_observer { + return output; + } + let (votes_map, last_self_vote) = match vote_type { + VoteType::Prevote => (&mut self.prevotes, &mut self.last_self_prevote), + VoteType::Precommit => (&mut self.precommits, &mut self.last_self_precommit), }; + // Record the vote in the appropriate map. + let inserted = votes_map.insert((self.round, self.id), (vote.clone(), 1)).is_none(); + assert!( + inserted, + "This should never happen: duplicate self {:?} vote for round={}, id={}", + vote_type, self.round, self.id + ); + // Update the latest self vote. assert!( last_self_vote.as_ref().is_none_or(|last| self.round > last.round), "State machine must progress in time: last_vote: {last_self_vote:?} new_vote: {vote:?}" ); *last_self_vote = Some(vote.clone()); - vote + // Returns VecDeque instead of a single SMRequest so callers can chain requests using + // append(). + output.push_back(SMRequest::BroadcastVote(vote)); + output } - /// Starts the state machine, effectively calling `StartRound(0)` from the paper. This is - /// needed to trigger the first leader to propose. - /// See [`GetProposal`](StateMachineEvent::GetProposal) - pub(crate) fn start(&mut self, leader_fn: &LeaderFn) -> VecDeque + /// Starts the state machine, effectively calling `StartRound(0)` from the paper. + pub(crate) fn start(&mut self, leader_fn: &LeaderFn) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -220,26 +250,25 @@ impl StateMachine { /// Process the incoming event. /// - /// If we are waiting for a response to [`GetProposal`](`StateMachineEvent::GetProposal`) all - /// other incoming events are buffered until that response arrives. + /// If we are waiting for a response to + /// [`FinishedBuilding`](`StateMachineEvent::FinishedBuilding`) all other incoming events + /// are buffered until that response arrives. /// - /// Returns a set of events for the caller to handle. The caller should not mirror the output - /// events back to the state machine, as it makes sure to handle them before returning. - // This means that the StateMachine handles events the same regardless of whether it was sent by - // self or a peer. This is in line with the Algorithm 1 in the paper and keeps the code simpler. + /// Returns a set of requests for the caller to handle. The caller should handle them and pass + /// the relevant response back to the state machine. pub(crate) fn handle_event( &mut self, event: StateMachineEvent, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { // Mimic LOC 18 in the paper; the state machine doesn't // handle any events until `getValue` completes. - if self.awaiting_get_proposal { + if self.awaiting_finished_building { match event { - StateMachineEvent::GetProposal(_, round) if round == self.round => { + StateMachineEvent::FinishedBuilding(_, round) if round == self.round => { self.events_queue.push_front(event); } _ => { @@ -257,99 +286,98 @@ impl StateMachine { pub(crate) fn handle_enqueued_events( &mut self, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { - let mut output_events = VecDeque::new(); + let mut output_requests = VecDeque::new(); while let Some(event) = self.events_queue.pop_front() { - // Handle a specific event and then decide which of the output events should also be - // sent to self. - let mut resultant_events = self.handle_event_internal(event, leader_fn); - while let Some(e) = resultant_events.pop_front() { - match e { - StateMachineEvent::Proposal(_, _, _) - | StateMachineEvent::Prevote(_) - | StateMachineEvent::Precommit(_) => { - if self.is_observer { - continue; - } - self.events_queue.push_back(e.clone()); - } - StateMachineEvent::Decision(_, _) => { - output_events.push_back(e); - return output_events; - } - StateMachineEvent::GetProposal(_, _) => { + let mut resultant_requests = self.handle_event_internal(event, leader_fn); + while let Some(r) = resultant_requests.pop_front() { + match r { + SMRequest::StartBuildProposal(_) => { // LOC 18 in the paper. - assert!(resultant_events.is_empty()); + assert!(resultant_requests.is_empty()); assert!(!self.is_observer); - output_events.push_back(e); - return output_events; + output_requests.push_back(r); + return output_requests; + } + SMRequest::DecisionReached(_, _) => { + // These requests stop processing and return immediately. + output_requests.push_back(r); + return output_requests; + } + SMRequest::BroadcastVote(_) => { + assert!(!self.is_observer, "Observers should not broadcast votes"); + output_requests.push_back(r); + } + _ => { + output_requests.push_back(r); } - _ => {} } - output_events.push_back(e); } } - output_events + output_requests } pub(crate) fn handle_event_internal( &mut self, event: StateMachineEvent, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { trace!("Processing event: {:?}", event); - if self.awaiting_get_proposal { - assert!(matches!(event, StateMachineEvent::GetProposal(_, _)), "{event:?}"); + if self.awaiting_finished_building { + assert!(matches!(event, StateMachineEvent::FinishedBuilding(_, _)), "{event:?}"); } match event { - StateMachineEvent::GetProposal(proposal_id, round) => { - self.handle_get_proposal(proposal_id, round) + StateMachineEvent::FinishedBuilding(proposal_id, round) => { + self.handle_finished_building(proposal_id, round, leader_fn) } - StateMachineEvent::Proposal(proposal_id, round, valid_round) => { - self.handle_proposal(proposal_id, round, valid_round, leader_fn) + StateMachineEvent::FinishedValidation(proposal_id, round, valid_round) => { + self.handle_finished_validation(proposal_id, round, valid_round, leader_fn) } StateMachineEvent::Prevote(vote) => self.handle_prevote(vote, leader_fn), StateMachineEvent::Precommit(vote) => self.handle_precommit(vote, leader_fn), - StateMachineEvent::Decision(_, _) => { - unimplemented!( - "If the caller knows of a decision, it can just drop the state machine." - ) - } StateMachineEvent::TimeoutPropose(round) => self.handle_timeout_propose(round), StateMachineEvent::TimeoutPrevote(round) => self.handle_timeout_prevote(round), StateMachineEvent::TimeoutPrecommit(round) => { self.handle_timeout_precommit(round, leader_fn) } + StateMachineEvent::RebroadcastVote(_) => { + unreachable!("StateMachine should not receive RebroadcastVote events"); + } } } - pub(crate) fn handle_get_proposal( + pub(crate) fn handle_finished_building( &mut self, proposal_id: Option, round: u32, - ) -> VecDeque { - // TODO(matan): Will we allow other events (timeoutPropose) to exit this state? - assert!(self.awaiting_get_proposal); + leader_fn: &LeaderFn, + ) -> VecDeque + where + LeaderFn: Fn(Round) -> ValidatorId, + { + assert!(self.awaiting_finished_building); assert_eq!(round, self.round); - self.awaiting_get_proposal = false; - VecDeque::from([StateMachineEvent::Proposal(proposal_id, round, None)]) + self.awaiting_finished_building = false; + let old = self.proposals.insert(round, (proposal_id, None)); + assert!(old.is_none(), "Proposal built when one already exists for this round."); + + self.map_round_to_upons(round, leader_fn) } - // A proposal from a peer (or self) node. - pub(crate) fn handle_proposal( + pub(crate) fn handle_finished_validation( &mut self, proposal_id: Option, round: u32, valid_round: Option, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -358,7 +386,7 @@ impl StateMachine { self.map_round_to_upons(round, leader_fn) } - pub(crate) fn handle_timeout_propose(&mut self, round: u32) -> VecDeque { + pub(crate) fn handle_timeout_propose(&mut self, round: u32) -> VecDeque { if self.step != Step::Propose || round != self.round { return VecDeque::new(); }; @@ -367,18 +395,17 @@ impl StateMachine { round={round}." ); CONSENSUS_TIMEOUTS.increment(1, &[(LABEL_NAME_TIMEOUT_TYPE, TimeoutType::Propose.into())]); - let vote = self.make_self_vote(VoteType::Prevote, None); - let mut output = VecDeque::from([StateMachineEvent::Prevote(vote)]); + let mut output = self.make_self_vote(VoteType::Prevote, None); output.append(&mut self.advance_to_step(Step::Prevote)); output } - // A prevote from a peer (or self) node. + // A prevote from a peer node. pub(crate) fn handle_prevote( &mut self, vote: Vote, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -393,24 +420,23 @@ impl StateMachine { self.map_round_to_upons(round, leader_fn) } - pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque { + pub(crate) fn handle_timeout_prevote(&mut self, round: u32) -> VecDeque { if self.step != Step::Prevote || round != self.round { return VecDeque::new(); }; debug!("Applying TimeoutPrevote for round={round}."); CONSENSUS_TIMEOUTS.increment(1, &[(LABEL_NAME_TIMEOUT_TYPE, TimeoutType::Prevote.into())]); - let vote = self.make_self_vote(VoteType::Precommit, None); - let mut output = VecDeque::from([StateMachineEvent::Precommit(vote)]); + let mut output = self.make_self_vote(VoteType::Precommit, None); output.append(&mut self.advance_to_step(Step::Precommit)); output } - // A precommit from a peer (or self) node. + // A precommit from a peer node. fn handle_precommit( &mut self, vote: Vote, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -429,7 +455,7 @@ impl StateMachine { &mut self, round: u32, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -447,7 +473,7 @@ impl StateMachine { &mut self, round: u32, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -468,26 +494,35 @@ impl StateMachine { info!("START_ROUND_PROPOSER: Starting round {round} as Proposer"); // Leader. match self.valid_value_round { - Some((proposal_id, valid_round)) => VecDeque::from([StateMachineEvent::Proposal( - Some(proposal_id), - self.round, - Some(valid_round), - )]), + Some((proposal_id, valid_round)) => { + // Record the valid proposal for the current round so upon_reproposal() can + // observe it and emit the corresponding prevote immediately. + let old = + self.proposals.insert(self.round, (Some(proposal_id), Some(valid_round))); + assert!(old.is_none(), "Proposal for current round should not already exist"); + let init = ProposalInit { + height: self.height, + round: self.round, + proposer: self.id, + valid_round: Some(valid_round), + }; + VecDeque::from([SMRequest::Repropose(proposal_id, init)]) + } None => { - self.awaiting_get_proposal = true; + self.awaiting_finished_building = true; // Upon conditions are not checked while awaiting a new proposal. - return VecDeque::from([StateMachineEvent::GetProposal(None, self.round)]); + return VecDeque::from([SMRequest::StartBuildProposal(self.round)]); } } } else { info!("START_ROUND_VALIDATOR: Starting round {round} as Validator"); - VecDeque::from([StateMachineEvent::TimeoutPropose(self.round)]) + VecDeque::from([SMRequest::ScheduleTimeoutPropose(self.round)]) }; output.append(&mut self.current_round_upons()); output } - fn advance_to_step(&mut self, step: Step) -> VecDeque { + fn advance_to_step(&mut self, step: Step) -> VecDeque { assert_ne!(step, Step::Propose, "Advancing to Propose is done by advancing rounds"); info!("Advancing step: from {:?} to {step:?} in round={}", self.step, self.round); self.step = step; @@ -498,7 +533,7 @@ impl StateMachine { &mut self, round: u32, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { @@ -509,7 +544,7 @@ impl StateMachine { } } - fn current_round_upons(&mut self) -> VecDeque { + fn current_round_upons(&mut self) -> VecDeque { let mut output = VecDeque::new(); output.append(&mut self.upon_new_proposal()); output.append(&mut self.upon_reproposal()); @@ -521,7 +556,7 @@ impl StateMachine { output } - fn past_round_upons(&mut self, round: u32) -> VecDeque { + fn past_round_upons(&mut self, round: u32) -> VecDeque { let mut output = VecDeque::new(); output.append(&mut self.upon_reproposal()); output.append(&mut self.upon_decision(round)); @@ -529,7 +564,7 @@ impl StateMachine { } // LOC 22 in the paper. - fn upon_new_proposal(&mut self) -> VecDeque { + fn upon_new_proposal(&mut self) -> VecDeque { // StateMachine assumes that the proposer is valid. if self.step != Step::Propose { return VecDeque::new(); @@ -547,14 +582,13 @@ impl StateMachine { } else { None }; - let vote = self.make_self_vote(VoteType::Prevote, vote_commitment); - let mut output = VecDeque::from([StateMachineEvent::Prevote(vote)]); + let mut output = self.make_self_vote(VoteType::Prevote, vote_commitment); output.append(&mut self.advance_to_step(Step::Prevote)); output } // LOC 28 in the paper. - fn upon_reproposal(&mut self) -> VecDeque { + fn upon_reproposal(&mut self) -> VecDeque { if self.step != Step::Propose { return VecDeque::new(); } @@ -579,14 +613,13 @@ impl StateMachine { } else { None }; - let vote = self.make_self_vote(VoteType::Prevote, vote_commitment); - let mut output = VecDeque::from([StateMachineEvent::Prevote(vote)]); + let mut output = self.make_self_vote(VoteType::Prevote, vote_commitment); output.append(&mut self.advance_to_step(Step::Prevote)); output } // LOC 34 in the paper. - fn maybe_initiate_timeout_prevote(&mut self) -> VecDeque { + fn maybe_initiate_timeout_prevote(&mut self) -> VecDeque { if self.step != Step::Prevote { return VecDeque::new(); } @@ -597,11 +630,11 @@ impl StateMachine { if !self.mixed_prevote_quorum.insert(self.round) { return VecDeque::new(); } - VecDeque::from([StateMachineEvent::TimeoutPrevote(self.round)]) + VecDeque::from([SMRequest::ScheduleTimeoutPrevote(self.round)]) } // LOC 36 in the paper. - fn upon_prevote_quorum(&mut self) -> VecDeque { + fn upon_prevote_quorum(&mut self) -> VecDeque { if self.step == Step::Propose { return VecDeque::new(); } @@ -629,28 +662,26 @@ impl StateMachine { CONSENSUS_NEW_VALUE_LOCKS.increment(1); } self.locked_value_round = new_value; - let vote = self.make_self_vote(VoteType::Precommit, Some(*proposal_id)); - let mut output = VecDeque::from([StateMachineEvent::Precommit(vote)]); + let mut output = self.make_self_vote(VoteType::Precommit, Some(*proposal_id)); output.append(&mut self.advance_to_step(Step::Precommit)); output } // LOC 44 in the paper. - fn upon_nil_prevote_quorum(&mut self) -> VecDeque { + fn upon_nil_prevote_quorum(&mut self) -> VecDeque { if self.step != Step::Prevote { return VecDeque::new(); } if !self.value_has_enough_votes(&self.prevotes, self.round, &None, &self.quorum) { return VecDeque::new(); } - let vote = self.make_self_vote(VoteType::Precommit, None); - let mut output = VecDeque::from([StateMachineEvent::Precommit(vote)]); + let mut output = self.make_self_vote(VoteType::Precommit, None); output.append(&mut self.advance_to_step(Step::Precommit)); output } // LOC 47 in the paper. - fn maybe_initiate_timeout_precommit(&mut self) -> VecDeque { + fn maybe_initiate_timeout_precommit(&mut self) -> VecDeque { if !self.round_has_enough_votes(&self.precommits, self.round, &self.quorum) { return VecDeque::new(); } @@ -658,11 +689,11 @@ impl StateMachine { if !self.mixed_precommit_quorum.insert(self.round) { return VecDeque::new(); } - VecDeque::from([StateMachineEvent::TimeoutPrecommit(self.round)]) + VecDeque::from([SMRequest::ScheduleTimeoutPrecommit(self.round)]) } // LOC 49 in the paper. - fn upon_decision(&mut self, round: u32) -> VecDeque { + fn upon_decision(&mut self, round: u32) -> VecDeque { let Some((Some(proposal_id), _)) = self.proposals.get(&round) else { return VecDeque::new(); }; @@ -671,7 +702,7 @@ impl StateMachine { return VecDeque::new(); } - VecDeque::from([StateMachineEvent::Decision(*proposal_id, round)]) + VecDeque::from([SMRequest::DecisionReached(*proposal_id, round)]) } // LOC 55 in the paper. @@ -679,7 +710,7 @@ impl StateMachine { &mut self, round: u32, leader_fn: &LeaderFn, - ) -> VecDeque + ) -> VecDeque where LeaderFn: Fn(Round) -> ValidatorId, { diff --git a/crates/apollo_consensus/src/state_machine_test.rs b/crates/apollo_consensus/src/state_machine_test.rs index e8d58e0c7bf..4e9ef9ae529 100644 --- a/crates/apollo_consensus/src/state_machine_test.rs +++ b/crates/apollo_consensus/src/state_machine_test.rs @@ -7,7 +7,7 @@ use starknet_types_core::felt::Felt; use test_case::test_case; use super::Round; -use crate::state_machine::{StateMachine, StateMachineEvent}; +use crate::state_machine::{SMRequest, StateMachine, StateMachineEvent}; use crate::types::{ProposalCommitment, ValidatorId}; use crate::votes_threshold::QuorumType; @@ -34,7 +34,7 @@ fn mk_vote( struct TestWrapper ValidatorId> { state_machine: StateMachine, leader_fn: LeaderFn, - events: VecDeque, + requests: VecDeque, peer_voters: Vec, next_peer_idx: usize, } @@ -56,7 +56,7 @@ impl ValidatorId> TestWrapper { Self { state_machine: StateMachine::new(HEIGHT, id, total_weight, is_observer, quorum_type), leader_fn, - events: VecDeque::new(), + requests: VecDeque::new(), peer_voters, next_peer_idx: 0, } @@ -68,20 +68,28 @@ impl ValidatorId> TestWrapper { voter } - pub fn next_event(&mut self) -> Option { - self.events.pop_front() + pub fn next_request(&mut self) -> Option { + self.requests.pop_front() } pub fn start(&mut self) { - self.events.append(&mut self.state_machine.start(&self.leader_fn)) + self.requests.append(&mut self.state_machine.start(&self.leader_fn)) } - pub fn send_get_proposal(&mut self, proposal_id: Option, round: Round) { - self.send_event(StateMachineEvent::GetProposal(proposal_id, round)) + pub fn send_finished_building( + &mut self, + proposal_id: Option, + round: Round, + ) { + self.send_event(StateMachineEvent::FinishedBuilding(proposal_id, round)) } - pub fn send_proposal(&mut self, proposal_id: Option, round: Round) { - self.send_event(StateMachineEvent::Proposal(proposal_id, round, None)) + pub fn send_finished_validation( + &mut self, + proposal_id: Option, + round: Round, + ) { + self.send_event(StateMachineEvent::FinishedValidation(proposal_id, round, None)) } pub fn send_prevote(&mut self, proposal_id: Option, round: Round) { @@ -117,7 +125,7 @@ impl ValidatorId> TestWrapper { } fn send_event(&mut self, event: StateMachineEvent) { - self.events.append(&mut self.state_machine.handle_event(event, &self.leader_fn)); + self.requests.append(&mut self.state_machine.handle_event(event, &self.leader_fn)); } } @@ -130,47 +138,43 @@ fn events_arrive_in_ideal_order(is_proposer: bool) { wrapper.start(); if is_proposer { - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::GetProposal(None, ROUND)); - wrapper.send_get_proposal(PROPOSAL_ID, ROUND); - assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND, None) - ); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::StartBuildProposal(ROUND)); + wrapper.send_finished_building(PROPOSAL_ID, ROUND); } else { // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); - wrapper.send_proposal(PROPOSAL_ID, ROUND); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); } assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, id)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, id)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); wrapper.send_prevote(PROPOSAL_ID, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); wrapper.send_prevote(PROPOSAL_ID, ROUND); // The Node got a Prevote quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, id)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, id)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); wrapper.send_precommit(PROPOSAL_ID, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); wrapper.send_precommit(PROPOSAL_ID, ROUND); // The Node got a Precommit quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Decision(PROPOSAL_ID.unwrap(), ROUND) + wrapper.next_request().unwrap(), + SMRequest::DecisionReached(PROPOSAL_ID.unwrap(), ROUND) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test] @@ -180,8 +184,8 @@ fn validator_receives_votes_first() { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); // Receives votes from all the other nodes first (more than minimum for a quorum). wrapper.send_prevote(PROPOSAL_ID, ROUND); @@ -192,30 +196,25 @@ fn validator_receives_votes_first() { wrapper.send_precommit(PROPOSAL_ID, ROUND); // The Node got a Precommit quorum. TimeoutPrevote is only initiated once the SM reaches the // prevote step. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); + assert!(wrapper.next_request().is_none()); // Finally the proposal arrives. - wrapper.send_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote( - VoteType::Precommit, - ROUND, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Decision(PROPOSAL_ID.unwrap(), ROUND) + wrapper.next_request().unwrap(), + SMRequest::DecisionReached(PROPOSAL_ID.unwrap(), ROUND) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test_case(PROPOSAL_ID ; "valid_proposal")] @@ -225,30 +224,26 @@ fn buffer_events_during_get_proposal(vote: Option) { TestWrapper::new(*PROPOSER_ID, 4, |_: Round| *PROPOSER_ID, false, QuorumType::Byzantine); wrapper.start(); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::GetProposal(None, 0)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::StartBuildProposal(0)); + assert!(wrapper.next_request().is_none()); wrapper.send_prevote(vote, ROUND); wrapper.send_prevote(vote, ROUND); wrapper.send_prevote(vote, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Node finishes building the proposal. - wrapper.send_get_proposal(PROPOSAL_ID, ROUND); - assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND, None) - ); + wrapper.send_finished_building(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *PROPOSER_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *PROPOSER_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote(VoteType::Precommit, ROUND, vote, *PROPOSER_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, vote, *PROPOSER_ID)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test] @@ -258,32 +253,27 @@ fn only_send_precommit_with_prevote_quorum_and_proposal() { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); // Receives votes from all the other nodes first (more than minimum for a quorum). wrapper.send_prevote(PROPOSAL_ID, ROUND); wrapper.send_prevote(PROPOSAL_ID, ROUND); wrapper.send_prevote(PROPOSAL_ID, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Finally the proposal arrives. - wrapper.send_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote( - VoteType::Precommit, - ROUND, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test] @@ -293,8 +283,8 @@ fn only_decide_with_prcommit_quorum_and_proposal() { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); // Receives votes from all the other nodes first (more than minimum for a quorum). wrapper.send_prevote(PROPOSAL_ID, ROUND); @@ -302,30 +292,25 @@ fn only_decide_with_prcommit_quorum_and_proposal() { wrapper.send_prevote(PROPOSAL_ID, ROUND); wrapper.send_precommit(PROPOSAL_ID, ROUND); wrapper.send_precommit(PROPOSAL_ID, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Finally the proposal arrives. - wrapper.send_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote( - VoteType::Precommit, - ROUND, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Decision(PROPOSAL_ID.unwrap(), ROUND) + wrapper.next_request().unwrap(), + SMRequest::DecisionReached(PROPOSAL_ID.unwrap(), ROUND) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test] @@ -335,34 +320,29 @@ fn advance_to_the_next_round() { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); - wrapper.send_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); wrapper.send_precommit(None, ROUND); wrapper.send_precommit(None, ROUND); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); - wrapper.send_proposal(PROPOSAL_ID, ROUND + 1); - assert!(wrapper.next_event().is_none()); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND + 1); + assert!(wrapper.next_request().is_none()); wrapper.send_precommit(None, ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); wrapper.send_timeout_precommit(ROUND); // The Node sends Prevote after advancing to the next round. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND + 1)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND + 1)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote( - VoteType::Prevote, - ROUND + 1, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND + 1, PROPOSAL_ID, *VALIDATOR_ID)) ); } @@ -372,30 +352,25 @@ fn prevote_when_receiving_proposal_in_current_round() { TestWrapper::new(*VALIDATOR_ID, 4, |_: Round| *PROPOSER_ID, false, QuorumType::Byzantine); wrapper.start(); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); wrapper.send_precommit(None, ROUND); wrapper.send_precommit(None, ROUND); wrapper.send_precommit(None, ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); wrapper.send_timeout_precommit(ROUND); // The node starts the next round, shouldn't prevote when receiving a proposal for the // previous round. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND + 1)); - wrapper.send_proposal(PROPOSAL_ID, ROUND); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND + 1)); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); + assert!(wrapper.next_request().is_none()); // The node should prevote when receiving a proposal for the current round. - wrapper.send_proposal(PROPOSAL_ID, ROUND + 1); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND + 1); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote( - VoteType::Prevote, - ROUND + 1, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND + 1, PROPOSAL_ID, *VALIDATOR_ID)) ); } @@ -407,42 +382,37 @@ fn mixed_quorum(send_proposal: bool) { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.events.is_empty()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.requests.is_empty()); if send_proposal { - wrapper.send_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote( - VoteType::Prevote, - ROUND, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); } else { wrapper.send_timeout_propose(ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, None, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, None, *VALIDATOR_ID)) ); } wrapper.send_prevote(PROPOSAL_ID, ROUND); wrapper.send_prevote(None, ROUND); // The Node got a Prevote quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); wrapper.send_timeout_prevote(ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote(VoteType::Precommit, ROUND, None, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, None, *VALIDATOR_ID)) ); wrapper.send_precommit(PROPOSAL_ID, ROUND); wrapper.send_precommit(PROPOSAL_ID, ROUND); // The Node got a Precommit quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); wrapper.send_timeout_precommit(ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND + 1)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND + 1)); } #[test] @@ -451,8 +421,8 @@ fn dont_handle_enqueued_while_awaiting_get_proposal() { TestWrapper::new(*PROPOSER_ID, 4, |_: Round| *PROPOSER_ID, false, QuorumType::Byzantine); wrapper.start(); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::GetProposal(None, ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::StartBuildProposal(ROUND)); + assert!(wrapper.next_request().is_none()); // We simulate that this node is always the proposer, but it lagged, so the peers kept voting // NIL and progressing rounds. @@ -470,25 +440,37 @@ fn dont_handle_enqueued_while_awaiting_get_proposal() { wrapper.send_precommit(None, ROUND + 1); // It now receives the proposal. - wrapper.send_get_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_building(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND, None) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *PROPOSER_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); + // Nil Prevote quorum, so we broadcast a nil Precommit. + assert_eq!( + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, None, *PROPOSER_ID)) + ); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); // Timeout and advance on to the next round. wrapper.send_timeout_precommit(ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::GetProposal(None, ROUND + 1)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::StartBuildProposal(ROUND + 1)); + assert!(wrapper.next_request().is_none()); // The other votes are only handled after the next GetProposal is received. - wrapper.send_get_proposal(PROPOSAL_ID, ROUND + 1); + wrapper.send_finished_building(PROPOSAL_ID, ROUND + 1); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND + 1, None) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND + 1, PROPOSAL_ID, *PROPOSER_ID)) ); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND + 1)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND + 1)); + // Nil Prevote quorum, so we broadcast a nil Precommit. + assert_eq!( + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND + 1, None, *PROPOSER_ID)) + ); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND + 1)); } #[test] @@ -497,51 +479,45 @@ fn return_proposal_if_locked_value_is_set() { TestWrapper::new(*PROPOSER_ID, 4, |_: Round| *PROPOSER_ID, false, QuorumType::Byzantine); wrapper.start(); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::GetProposal(None, ROUND)); - assert!(wrapper.next_event().is_none()); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::StartBuildProposal(ROUND)); + assert!(wrapper.next_request().is_none()); - wrapper.send_get_proposal(PROPOSAL_ID, ROUND); + wrapper.send_finished_building(PROPOSAL_ID, ROUND); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND, None) - ); - assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *PROPOSER_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *PROPOSER_ID)) ); // locked_value is set after receiving a Prevote quorum. wrapper.send_prevote(PROPOSAL_ID, ROUND); wrapper.send_prevote(PROPOSAL_ID, ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote( - VoteType::Precommit, - ROUND, - PROPOSAL_ID, - *PROPOSER_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, *PROPOSER_ID)) ); wrapper.send_precommit(None, ROUND); wrapper.send_precommit(None, ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); wrapper.send_timeout_precommit(ROUND); // no need to GetProposal since we already have a locked value. assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Proposal(PROPOSAL_ID, ROUND + 1, Some(ROUND)) + wrapper.next_request().unwrap(), + SMRequest::Repropose( + PROPOSAL_ID.unwrap(), + apollo_protobuf::consensus::ProposalInit { + height: HEIGHT, + round: ROUND + 1, + proposer: *PROPOSER_ID, + valid_round: Some(ROUND), + } + ) ); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote( - VoteType::Prevote, - ROUND + 1, - PROPOSAL_ID, - *PROPOSER_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND + 1, PROPOSAL_ID, *PROPOSER_ID)) ); } @@ -553,22 +529,22 @@ fn observer_node_reaches_decision() { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); - wrapper.send_proposal(PROPOSAL_ID, ROUND); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); // The observer node does not respond to the proposal by sending votes. - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); wrapper.send_precommit(PROPOSAL_ID, ROUND); wrapper.send_precommit(PROPOSAL_ID, ROUND); wrapper.send_precommit(PROPOSAL_ID, ROUND); - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); // Once a quorum of precommits is observed, the node should generate a decision event. assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Decision(PROPOSAL_ID.unwrap(), ROUND) + wrapper.next_request().unwrap(), + SMRequest::DecisionReached(PROPOSAL_ID.unwrap(), ROUND) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); } #[test_case(QuorumType::Byzantine; "byzantine")] @@ -579,16 +555,16 @@ fn number_of_required_votes(quorum_type: QuorumType) { wrapper.start(); // Waiting for the proposal. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPropose(ROUND)); - assert!(wrapper.next_event().is_none()); - wrapper.send_proposal(PROPOSAL_ID, ROUND); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPropose(ROUND)); + assert!(wrapper.next_request().is_none()); + wrapper.send_finished_validation(PROPOSAL_ID, ROUND); // The node says this proposal is valid (vote 1). assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Prevote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Prevote, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Another node sends a Prevote (vote 2). wrapper.send_prevote(PROPOSAL_ID, ROUND); @@ -596,7 +572,7 @@ fn number_of_required_votes(quorum_type: QuorumType) { // Byzantine quorum requires 3 votes, so we need one more vote. if quorum_type == QuorumType::Byzantine { // Not enough votes for a quorum yet. - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Another node sends a Prevote (vote 3). wrapper.send_prevote(PROPOSAL_ID, ROUND); @@ -604,19 +580,14 @@ fn number_of_required_votes(quorum_type: QuorumType) { // In honest case, the second vote is enough for a quorum. // The Node got a Prevote quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrevote(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrevote(ROUND)); // The Node sends a Precommit (vote 1). assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Precommit(mk_vote( - VoteType::Precommit, - ROUND, - PROPOSAL_ID, - *VALIDATOR_ID - )) + wrapper.next_request().unwrap(), + SMRequest::BroadcastVote(mk_vote(VoteType::Precommit, ROUND, PROPOSAL_ID, *VALIDATOR_ID)) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Another node sends a Precommit (vote 2). wrapper.send_precommit(PROPOSAL_ID, ROUND); @@ -624,7 +595,7 @@ fn number_of_required_votes(quorum_type: QuorumType) { // Byzantine quorum requires 3 votes, so we need one more vote. if quorum_type == QuorumType::Byzantine { // Not enough votes for a quorum yet. - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); // Another node sends a Precommit (vote 3). wrapper.send_precommit(PROPOSAL_ID, ROUND); @@ -632,10 +603,10 @@ fn number_of_required_votes(quorum_type: QuorumType) { // In honest case, the second vote is enough for a quorum. // The Node got a Precommit quorum. - assert_eq!(wrapper.next_event().unwrap(), StateMachineEvent::TimeoutPrecommit(ROUND)); + assert_eq!(wrapper.next_request().unwrap(), SMRequest::ScheduleTimeoutPrecommit(ROUND)); assert_eq!( - wrapper.next_event().unwrap(), - StateMachineEvent::Decision(PROPOSAL_ID.unwrap(), ROUND) + wrapper.next_request().unwrap(), + SMRequest::DecisionReached(PROPOSAL_ID.unwrap(), ROUND) ); - assert!(wrapper.next_event().is_none()); + assert!(wrapper.next_request().is_none()); }