diff --git a/crates/apollo_consensus/src/manager_test.rs b/crates/apollo_consensus/src/manager_test.rs index 9eeb7c2886a..cd0e19e57a7 100644 --- a/crates/apollo_consensus/src/manager_test.rs +++ b/crates/apollo_consensus/src/manager_test.rs @@ -39,7 +39,7 @@ use crate::test_utils::{ NoOpHeightVotedStorage, TestProposalPart, }; -use crate::types::ValidatorId; +use crate::types::{Round, ValidatorId}; use crate::votes_threshold::QuorumType; use crate::RunConsensusArguments; @@ -74,6 +74,9 @@ const CHANNEL_SIZE: usize = 10; const HEIGHT_0: BlockNumber = BlockNumber(0); const HEIGHT_1: BlockNumber = BlockNumber(1); const HEIGHT_2: BlockNumber = BlockNumber(2); +const ROUND_0: Round = 0; +const ROUND_1: Round = 1; +const ROUND_2: Round = 2; const SYNC_RETRY_INTERVAL: Duration = Duration::from_millis(100); #[fixture] @@ -120,7 +123,7 @@ fn expect_validate_proposal(context: &mut MockTestContext, block_hash: Felt, tim .times(times); } -fn assert_decision(res: RunHeightRes, id: Felt, round: u32) { +fn assert_decision(res: RunHeightRes, id: Felt, round: Round) { match res { RunHeightRes::Decision(decision) => { assert_eq!(decision.block, ProposalCommitment(id)); @@ -143,19 +146,19 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) { // Send messages for height 2 followed by those for height 1. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); // Run the manager for height 1. @@ -181,7 +184,7 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) { ) .await .unwrap(); - assert_decision(decision, Felt::ONE, 0); + assert_decision(decision, Felt::ONE, ROUND_0); // Run the manager for height 2. expect_validate_proposal(&mut context, Felt::TWO, 1); @@ -194,7 +197,7 @@ async fn manager_multiple_heights_unordered(consensus_config: ConsensusConfig) { ) .await .unwrap(); - assert_decision(decision, Felt::TWO, 0); + assert_decision(decision, Felt::TWO, ROUND_0); } #[rstest] @@ -226,14 +229,14 @@ async fn run_consensus_sync(consensus_config: ConsensusConfig) { // Send messages for height 2. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().unwrap(); let mut network_sender = mock_network.broadcasted_messages_sender; - send(&mut network_sender, prevote(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; - send(&mut network_sender, precommit(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; + send(&mut network_sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; + send(&mut network_sender, precommit(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; let run_consensus_args = RunConsensusArguments { consensus_config, start_active_height: HEIGHT_1, @@ -268,13 +271,13 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(None, HEIGHT_1, 0, *VALIDATOR_ID_2)).await; - send(&mut sender, prevote(None, HEIGHT_1, 0, *VALIDATOR_ID_3)).await; - send(&mut sender, precommit(None, HEIGHT_1, 0, *VALIDATOR_ID_2)).await; - send(&mut sender, precommit(None, HEIGHT_1, 0, *VALIDATOR_ID_3)).await; + send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_3)).await; + send(&mut sender, precommit(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_3)).await; let mut context = MockTestContext::new(); context.expect_set_height_and_round().returning(move |_, _| ()); @@ -290,7 +293,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { context .expect_broadcast() .times(1) - .withf(move |msg: &Vote| msg == &prevote(None, HEIGHT_1, 1, *VALIDATOR_ID)) + .withf(move |msg: &Vote| msg == &prevote(None, HEIGHT_1, ROUND_1, *VALIDATOR_ID)) .return_once(move |_| { timeout_send.send(()).unwrap(); Ok(()) @@ -313,7 +316,7 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { ) .await .unwrap(); - assert_decision(decision, Felt::ONE, 1); + assert_decision(decision, Felt::ONE, ROUND_1); }); // Wait for the timeout to be triggered. @@ -322,14 +325,14 @@ async fn test_timeouts(consensus_config: ConsensusConfig) { // reach a decision. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 1, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 1, *PROPOSER_ID)).await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_2)).await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_3)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_2)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_3)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_3)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_3)).await; manager_handle.await.unwrap(); } @@ -346,7 +349,7 @@ async fn timely_message_handling(consensus_config: ConsensusConfig) { let (mut proposal_receiver_sender, mut proposal_receiver_receiver) = mpsc::channel(0); let (mut content_sender, content_receiver) = mpsc::channel(0); content_sender - .try_send(TestProposalPart::Init(proposal_init(HEIGHT_1, 0, *PROPOSER_ID))) + .try_send(TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))) .unwrap(); proposal_receiver_sender.try_send(content_receiver).unwrap(); @@ -356,7 +359,7 @@ async fn timely_message_handling(consensus_config: ConsensusConfig) { let mut subscriber_channels = subscriber_channels.into(); let mut vote_sender = mock_network.broadcasted_messages_sender; let metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); - let vote = prevote(Some(Felt::TWO), HEIGHT_1, 0, *PROPOSER_ID); + let vote = prevote(Some(Felt::TWO), HEIGHT_1, ROUND_0, *PROPOSER_ID); // Fill up the buffer. while vote_sender.send((vote.clone(), metadata.clone())).now_or_never().is_some() {} @@ -401,29 +404,29 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu // Send proposal and votes for height 2 (should be dropped when processing height 0). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_2, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_2, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::TWO), HEIGHT_2, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::TWO), HEIGHT_2, ROUND_0, *PROPOSER_ID)).await; // Send proposal and votes for height 1 (should be cached when processing height 0). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; // Send proposal and votes for height 0 (current height - needed to reach consensus). send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_0, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_0, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ZERO), HEIGHT_0, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ZERO), HEIGHT_0, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ZERO), HEIGHT_0, ROUND_0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); context.expect_try_sync().returning(|_| false); @@ -463,7 +466,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu ) .await .unwrap(); - assert_decision(decision, Felt::ZERO, 0); + assert_decision(decision, Felt::ZERO, ROUND_0); // Run height 1 - should succeed using cached proposal. let decision = manager @@ -475,7 +478,7 @@ async fn future_height_limit_caching_and_dropping(mut consensus_config: Consensu ) .await .unwrap(); - assert_decision(decision, Felt::ONE, 0); + assert_decision(decision, Felt::ONE, ROUND_0); // Run height 2 in background - shouldn't reach consensus because proposal was dropped. let manager_handle = tokio::spawn(async move { @@ -520,27 +523,27 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C // Send proposals for rounds 0 and 1, proposal for round 1 should be dropped. send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_0, *PROPOSER_ID))], ) .await; send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 1, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_1, *PROPOSER_ID))], ) .await; // Send votes for round 1. These should be dropped because when state machine is in round 0, // round 1 > current_round(0) + future_round_limit(0). - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 1, *PROPOSER_ID)).await; - send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_2)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 1, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, 1, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; // Send Nil votes for round 0 (current round). - send(&mut sender, prevote(None, HEIGHT_1, 0, *VALIDATOR_ID_2)).await; - send(&mut sender, prevote(None, HEIGHT_1, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(None, HEIGHT_1, 0, *VALIDATOR_ID_2)).await; - send(&mut sender, precommit(None, HEIGHT_1, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await; + send(&mut sender, prevote(None, HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(None, HEIGHT_1, ROUND_0, *VALIDATOR_ID_2)).await; + send(&mut sender, precommit(None, HEIGHT_1, ROUND_0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); context.expect_try_sync().returning(|_| false); @@ -558,14 +561,14 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C context .expect_set_height_and_round() - .withf(|height, round| *height == HEIGHT_1 && *round == 1) + .withf(|height, round| *height == HEIGHT_1 && *round == ROUND_1) .times(1) .return_once(|_, _| { round1_trigger.send(()).unwrap(); }); context .expect_set_height_and_round() - .withf(|height, round| *height == HEIGHT_1 && *round == 2) + .withf(|height, round| *height == HEIGHT_1 && *round == ROUND_2) .times(1) .return_once(|_, _| { round2_trigger.send(()).unwrap(); @@ -585,10 +588,10 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C tokio::spawn(async move { round1_wait.await.unwrap(); // Send Nil votes from other nodes for round 1. - send(&mut sender_clone1, prevote(None, HEIGHT_1, 1, *VALIDATOR_ID_2)).await; - send(&mut sender_clone1, prevote(None, HEIGHT_1, 1, *PROPOSER_ID)).await; - send(&mut sender_clone1, precommit(None, HEIGHT_1, 1, *VALIDATOR_ID_2)).await; - send(&mut sender_clone1, precommit(None, HEIGHT_1, 1, *PROPOSER_ID)).await; + send(&mut sender_clone1, prevote(None, HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; + send(&mut sender_clone1, prevote(None, HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; + send(&mut sender_clone1, precommit(None, HEIGHT_1, ROUND_1, *VALIDATOR_ID_2)).await; + send(&mut sender_clone1, precommit(None, HEIGHT_1, ROUND_1, *PROPOSER_ID)).await; }); let mut sender_clone2 = sender.clone(); @@ -598,14 +601,16 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C // Send proposal for round 2. send_proposal( &mut proposal_sender_clone, - vec![TestProposalPart::Init(proposal_init(HEIGHT_1, 2, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT_1, ROUND_2, *PROPOSER_ID))], ) .await; // Send votes for round 2. - send(&mut sender_clone2, prevote(Some(Felt::ONE), HEIGHT_1, 2, *PROPOSER_ID)).await; - send(&mut sender_clone2, prevote(Some(Felt::ONE), HEIGHT_1, 2, *VALIDATOR_ID_2)).await; - send(&mut sender_clone2, precommit(Some(Felt::ONE), HEIGHT_1, 2, *PROPOSER_ID)).await; - send(&mut sender_clone2, precommit(Some(Felt::ONE), HEIGHT_1, 2, *VALIDATOR_ID_2)).await; + send(&mut sender_clone2, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_2, *PROPOSER_ID)).await; + send(&mut sender_clone2, prevote(Some(Felt::ONE), HEIGHT_1, ROUND_2, *VALIDATOR_ID_2)) + .await; + send(&mut sender_clone2, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_2, *PROPOSER_ID)).await; + send(&mut sender_clone2, precommit(Some(Felt::ONE), HEIGHT_1, ROUND_2, *VALIDATOR_ID_2)) + .await; }); // Run height 1 - should reach consensus in round 2 because: @@ -623,7 +628,7 @@ async fn current_height_round_limit_caching_and_dropping(mut consensus_config: C ) .await .unwrap(); - assert_decision(decision, Felt::ONE, 2); + assert_decision(decision, Felt::ONE, ROUND_2); } #[rstest] @@ -778,11 +783,11 @@ async fn manager_runs_normally_when_height_is_greater_than_last_voted_height( // Send a proposal for the height we already voted on: send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(CURRENT_HEIGHT, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ONE), CURRENT_HEIGHT, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ONE), CURRENT_HEIGHT, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ONE), CURRENT_HEIGHT, ROUND_0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); // Sync will never succeed so we will proceed to run consensus (during which try_sync is called @@ -810,7 +815,7 @@ async fn manager_runs_normally_when_height_is_greater_than_last_voted_height( .await .unwrap(); - assert_decision(decision, Felt::ONE, 0); + assert_decision(decision, Felt::ONE, ROUND_0); } #[rstest] @@ -833,11 +838,11 @@ async fn manager_waits_until_height_passes_last_voted_height(consensus_config: C // Send a proposal for the height we already voted on: send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(LAST_VOTED_HEIGHT, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; - send(&mut sender, prevote(Some(Felt::ONE), LAST_VOTED_HEIGHT, 0, *PROPOSER_ID)).await; - send(&mut sender, precommit(Some(Felt::ONE), LAST_VOTED_HEIGHT, 0, *PROPOSER_ID)).await; + send(&mut sender, prevote(Some(Felt::ONE), LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID)).await; + send(&mut sender, precommit(Some(Felt::ONE), LAST_VOTED_HEIGHT, ROUND_0, *PROPOSER_ID)).await; let mut context = MockTestContext::new(); // At the last voted height we expect the manager to halt until it can get the last voted height @@ -909,7 +914,7 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { context .expect_broadcast() .times(1) - .withf(move |msg: &Vote| msg == &prevote(Some(block_id.0), HEIGHT, 0, *VALIDATOR_ID)) + .withf(move |msg: &Vote| msg == &prevote(Some(block_id.0), HEIGHT, ROUND_0, *VALIDATOR_ID)) .in_sequence(&mut storage_before_broadcast_sequence) .returning(move |_| { if let Some(tx) = prevote_tx_for_callback.lock().unwrap().take() { @@ -934,7 +939,9 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { context .expect_broadcast() .times(1) - .withf(move |msg: &Vote| msg == &precommit(Some(block_id.0), HEIGHT, 0, *VALIDATOR_ID)) + .withf(move |msg: &Vote| { + msg == &precommit(Some(block_id.0), HEIGHT, ROUND_0, *VALIDATOR_ID) + }) .in_sequence(&mut storage_before_broadcast_sequence) .returning(move |_| { if let Some(tx) = precommit_tx_for_callback.lock().unwrap().take() { @@ -953,7 +960,7 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { // Send proposal first send_proposal( &mut proposal_receiver_sender, - vec![TestProposalPart::Init(proposal_init(HEIGHT, 0, *PROPOSER_ID))], + vec![TestProposalPart::Init(proposal_init(HEIGHT, ROUND_0, *PROPOSER_ID))], ) .await; @@ -972,20 +979,33 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { prevote_rx.await.unwrap(); // Now send other prevotes to reach quorum - send(&mut sender_clone, prevote(Some(block_id_for_votes.0), HEIGHT, 0, *PROPOSER_ID)).await; - send(&mut sender_clone, prevote(Some(block_id_for_votes.0), HEIGHT, 0, *VALIDATOR_ID_2)) - .await; - send(&mut sender_clone, prevote(Some(block_id_for_votes.0), HEIGHT, 0, *VALIDATOR_ID_3)) + send(&mut sender_clone, prevote(Some(block_id_for_votes.0), HEIGHT, ROUND_0, *PROPOSER_ID)) .await; + send( + &mut sender_clone, + prevote(Some(block_id_for_votes.0), HEIGHT, ROUND_0, *VALIDATOR_ID_2), + ) + .await; + send( + &mut sender_clone, + prevote(Some(block_id_for_votes.0), HEIGHT, ROUND_0, *VALIDATOR_ID_3), + ) + .await; // Wait for validator to broadcast precommit after reaching prevote quorum precommit_rx.await.unwrap(); // Now send other precommits to reach decision - send(&mut sender_clone, precommit(Some(block_id_for_votes.0), HEIGHT, 0, *PROPOSER_ID)) - .await; - send(&mut sender_clone, precommit(Some(block_id_for_votes.0), HEIGHT, 0, *VALIDATOR_ID_2)) - .await; + send( + &mut sender_clone, + precommit(Some(block_id_for_votes.0), HEIGHT, ROUND_0, *PROPOSER_ID), + ) + .await; + send( + &mut sender_clone, + precommit(Some(block_id_for_votes.0), HEIGHT, ROUND_0, *VALIDATOR_ID_2), + ) + .await; }); // Run height - this should trigger storage writes before broadcasts @@ -994,5 +1014,5 @@ async fn writes_voted_height_to_storage(consensus_config: ConsensusConfig) { .await .unwrap(); - assert_decision(decision, block_id.0, 0); + assert_decision(decision, block_id.0, ROUND_0); } diff --git a/crates/apollo_consensus/src/single_height_consensus_test.rs b/crates/apollo_consensus/src/single_height_consensus_test.rs index e4752ef53f3..958a7ef3593 100644 --- a/crates/apollo_consensus/src/single_height_consensus_test.rs +++ b/crates/apollo_consensus/src/single_height_consensus_test.rs @@ -10,7 +10,7 @@ use super::SingleHeightConsensus; use crate::single_height_consensus::ShcReturn; use crate::state_machine::{SMRequest, StateMachineEvent, Step}; use crate::test_utils::{precommit, prevote, TestBlock}; -use crate::types::{ProposalCommitment, ValidatorId}; +use crate::types::{ProposalCommitment, Round, ValidatorId}; use crate::votes_threshold::QuorumType; lazy_static! { @@ -27,6 +27,9 @@ lazy_static! { static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::default(); } const HEIGHT_0: BlockNumber = BlockNumber(0); +const ROUND_0: Round = 0; +const ROUND_1: Round = 1; + fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit { ProposalInit { height, ..*PROPOSAL_INIT } } @@ -45,13 +48,13 @@ fn proposer() { // Start should request to build proposal. let start_ret = shc.start(&leader_fn).unwrap(); assert_matches!(start_ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::StartBuildProposal(0))); + assert_matches!(reqs.pop_front(), Some(SMRequest::StartBuildProposal(ROUND_0))); assert!(reqs.is_empty()); }); // After FinishedBuilding, expect a prevote broadcast request. let ret = shc - .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) + .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), ROUND_0)) .unwrap(); assert_matches!(ret, ShcReturn::Requests(mut reqs) => { assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote); @@ -60,24 +63,24 @@ fn proposer() { // Receive two prevotes from other validators to reach prevote quorum. let _ = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_1)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)) .unwrap(); let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); // Expect a precommit broadcast request present. assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0))); + assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, ROUND_0))); assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit); assert!(reqs.is_empty()); }); // Now provide precommit votes to reach decision. let _ = shc - .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_1)) + .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)) .unwrap(); let decision = shc - .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); match decision { ShcReturn::Decision(d) => assert_eq!(d.block, BLOCK.id), @@ -126,10 +129,10 @@ fn validator(repeat_proposal: bool) { // Reach prevote quorum with two other validators. let _ = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_3)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_3)) .unwrap(); // Expect a precommit broadcast request present. assert_matches!(ret, ShcReturn::Requests(mut reqs) => { @@ -140,10 +143,10 @@ fn validator(repeat_proposal: bool) { // Now provide precommit votes to reach decision. let _ = shc - .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *PROPOSER_ID)) + .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *PROPOSER_ID)) .unwrap(); let decision = shc - .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); match decision { ShcReturn::Decision(d) => assert_eq!(d.block, BLOCK.id), @@ -172,10 +175,11 @@ fn vote_twice(same_vote: bool) { ) .unwrap(); - let _ = - shc.handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *PROPOSER_ID)).unwrap(); + let _ = shc + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *PROPOSER_ID)) + .unwrap(); let res = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); // On quorum of prevotes, expect a precommit broadcast request. assert_matches!(res, ShcReturn::Requests(mut reqs) => { @@ -185,12 +189,12 @@ fn vote_twice(same_vote: bool) { }); // Precommit handling towards decision. - let first_vote = precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *PROPOSER_ID); + let first_vote = precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *PROPOSER_ID); let _ = shc.handle_vote(&leader_fn, first_vote.clone()).unwrap(); let second_vote = if same_vote { first_vote.clone() } else { - precommit(Some(Felt::TWO), HEIGHT_0, 0, *PROPOSER_ID) + precommit(Some(Felt::TWO), HEIGHT_0, ROUND_0, *PROPOSER_ID) }; // When same_vote is true, this is a duplicate precommit from PROPOSER_ID (same as first_vote). // When same_vote is false, this is an equivocation (different vote from PROPOSER_ID). @@ -201,8 +205,8 @@ fn vote_twice(same_vote: bool) { // decision. let res = shc.handle_vote(&leader_fn, second_vote.clone()).unwrap(); assert_matches!(res, ShcReturn::Requests(r) if r.is_empty()); - let decision = - shc.handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_3)); + let decision = shc + .handle_vote(&leader_fn, precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_3)); match decision { Ok(ShcReturn::Decision(d)) => assert_eq!(d.block, BLOCK.id), _ => panic!("Expected decision"), @@ -224,7 +228,7 @@ fn rebroadcast_votes() { let _ = shc.start(&leader_fn).unwrap(); let ret = shc - .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) + .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), ROUND_0)) .unwrap(); assert_matches!(ret, ShcReturn::Requests(mut reqs) => { assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote); @@ -233,53 +237,60 @@ fn rebroadcast_votes() { // Receive two prevotes from other validators to reach prevote quorum at round 0. let _ = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_1)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)) .unwrap(); let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); // Expect a precommit broadcast at round 0. assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0))); - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0); + assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, ROUND_0))); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == ROUND_0); assert!(reqs.is_empty()); }); // Advance with NIL precommits from peers (no decision) -> expect scheduling of precommit // timeout. - let _ = shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, 0, *VALIDATOR_ID_1)).unwrap(); - let _ = shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, 0, *VALIDATOR_ID_2)).unwrap(); + let _ = + shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)).unwrap(); + let _ = + shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)).unwrap(); // Timeout at precommit(0) -> expect a prevote broadcast for round 1. - let ret = shc.handle_event(&leader_fn, StateMachineEvent::TimeoutPrecommit(0)).unwrap(); + let ret = shc.handle_event(&leader_fn, StateMachineEvent::TimeoutPrecommit(ROUND_0)).unwrap(); assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::Repropose(proposal_id, init)) if proposal_id == BLOCK.id && init.round == 1 && init.valid_round == Some(0)); - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == 1); + assert_matches!(reqs.pop_front(), Some(SMRequest::Repropose(proposal_id, init)) if proposal_id == BLOCK.id && init.round == ROUND_1 && init.valid_round == Some(ROUND_0)); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == ROUND_1); assert!(reqs.is_empty()); }); // Reach prevote quorum at round 1 with two other validators. let _ = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 1, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_1, *VALIDATOR_ID_2)) .unwrap(); let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 1, *VALIDATOR_ID_3)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_1, *VALIDATOR_ID_3)) .unwrap(); assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 1))); - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 1); + assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, ROUND_1))); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == ROUND_1); assert!(reqs.is_empty()); }); // Rebroadcast with older vote (round 0) - should be ignored (no broadcast, no task). let ret = shc.handle_event( &leader_fn, - StateMachineEvent::VoteBroadcasted(precommit(Some(BLOCK.id.0), HEIGHT_0, 0, *PROPOSER_ID)), + StateMachineEvent::VoteBroadcasted(precommit( + Some(BLOCK.id.0), + HEIGHT_0, + ROUND_0, + *PROPOSER_ID, + )), ); assert_matches!(ret.unwrap(), ShcReturn::Requests(r) if r.is_empty()); // Rebroadcast with current round (round 1) - should broadcast. - let rebroadcast_vote = precommit(Some(BLOCK.id.0), HEIGHT_0, 1, *PROPOSER_ID); + let rebroadcast_vote = precommit(Some(BLOCK.id.0), HEIGHT_0, ROUND_1, *PROPOSER_ID); let ret = shc .handle_event(&leader_fn, StateMachineEvent::VoteBroadcasted(rebroadcast_vote.clone())) .unwrap(); @@ -303,48 +314,50 @@ fn repropose() { let _ = shc.start(&leader_fn).unwrap(); // After building the proposal, the proposer broadcasts a prevote for round 0. let ret = shc - .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), 0)) + .handle_event(&leader_fn, StateMachineEvent::FinishedBuilding(Some(BLOCK.id), ROUND_0)) .unwrap(); // Expect a BroadcastVote(Prevote) request for round 0. assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == 0); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == ROUND_0); assert!(reqs.is_empty()); }); // A single prevote from another validator does not yet cause quorum. let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_1)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)) .unwrap(); // No new requests are expected at this point. assert_matches!(ret, ShcReturn::Requests(reqs) if reqs.is_empty()); // Reaching prevote quorum with a second external prevote; proposer will broadcast a precommit // and schedule a prevote timeout for round 0. let ret = shc - .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, 0, *VALIDATOR_ID_2)) + .handle_vote(&leader_fn, prevote(Some(BLOCK.id.0), HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)) .unwrap(); // Expect ScheduleTimeout(Step::Prevote, 0) and BroadcastVote(Precommit). assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, 0))); - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == 0); + assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Prevote, ROUND_0))); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Precommit && v.round == ROUND_0); assert!(reqs.is_empty()); }); // receiving Nil precommit requests and then decision on new round; just assert no panic and // decisions arrive after quorum. - let _ = shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, 0, *VALIDATOR_ID_1)).unwrap(); - let ret = shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, 0, *VALIDATOR_ID_2)).unwrap(); + let _ = + shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, ROUND_0, *VALIDATOR_ID_1)).unwrap(); + let ret = + shc.handle_vote(&leader_fn, precommit(None, HEIGHT_0, ROUND_0, *VALIDATOR_ID_2)).unwrap(); // assert that ret is ScheduleTimeoutPrecommit assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Precommit, 0))); + assert_matches!(reqs.pop_front(), Some(SMRequest::ScheduleTimeout(Step::Precommit, ROUND_0))); assert!(reqs.is_empty()); }); // No precommit quorum is reached. On TimeoutPrecommit(0) the proposer advances to round 1 with // a valid value (valid_round = Some(0)) and reproposes the same block, then broadcasts a // new prevote for round 1. - let ret = shc.handle_event(&leader_fn, StateMachineEvent::TimeoutPrecommit(0)).unwrap(); + let ret = shc.handle_event(&leader_fn, StateMachineEvent::TimeoutPrecommit(ROUND_0)).unwrap(); // Expect Repropose with init.round == 1, init.valid_round == Some(0), and a // BroadcastVote(Prevote) for round 1. assert_matches!(ret, ShcReturn::Requests(mut reqs) => { - assert_matches!(reqs.pop_front(), Some(SMRequest::Repropose(proposal_id, init)) if proposal_id == BLOCK.id && init.round == 1 && init.valid_round == Some(0)); - assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == 1); + assert_matches!(reqs.pop_front(), Some(SMRequest::Repropose(proposal_id, init)) if proposal_id == BLOCK.id && init.round == ROUND_1 && init.valid_round == Some(ROUND_0)); + assert_matches!(reqs.pop_front(), Some(SMRequest::BroadcastVote(v)) if v.vote_type == VoteType::Prevote && v.round == ROUND_1); assert!(reqs.is_empty()); }); } diff --git a/crates/apollo_consensus/src/test_utils.rs b/crates/apollo_consensus/src/test_utils.rs index a9608aac39e..085a0580ffd 100644 --- a/crates/apollo_consensus/src/test_utils.rs +++ b/crates/apollo_consensus/src/test_utils.rs @@ -104,7 +104,7 @@ mock! { pub fn prevote( block_felt: Option, height: BlockNumber, - round: u32, + round: Round, voter: ValidatorId, ) -> Vote { let proposal_commitment = block_felt.map(ProposalCommitment); @@ -114,13 +114,14 @@ pub fn prevote( pub fn precommit( block_felt: Option, height: BlockNumber, - round: u32, + round: Round, voter: ValidatorId, ) -> Vote { let proposal_commitment = block_felt.map(ProposalCommitment); Vote { vote_type: VoteType::Precommit, height, round, proposal_commitment, voter } } -pub fn proposal_init(height: BlockNumber, round: u32, proposer: ValidatorId) -> ProposalInit { + +pub fn proposal_init(height: BlockNumber, round: Round, proposer: ValidatorId) -> ProposalInit { ProposalInit { height, round, proposer, ..Default::default() } } diff --git a/crates/apollo_consensus/src/types.rs b/crates/apollo_consensus/src/types.rs index 07454929dbc..210e75da222 100644 --- a/crates/apollo_consensus/src/types.rs +++ b/crates/apollo_consensus/src/types.rs @@ -8,7 +8,7 @@ use apollo_network::network_manager::{ GenericReceiver, }; use apollo_network_types::network_types::BroadcastedMessageMetadata; -pub use apollo_protobuf::consensus::ProposalCommitment; +pub use apollo_protobuf::consensus::{ProposalCommitment, Round}; use apollo_protobuf::consensus::{ProposalInit, Vote}; use apollo_protobuf::converters::ProtobufConversionError; use async_trait::async_trait; @@ -22,7 +22,6 @@ use starknet_api::core::ContractAddress; /// signatures. // TODO(matan): Determine the actual type of NodeId. pub type ValidatorId = ContractAddress; -pub type Round = u32; /// Interface for consensus to call out to the node. /// diff --git a/crates/apollo_protobuf/src/consensus.rs b/crates/apollo_protobuf/src/consensus.rs index 404080e4d0a..0fb0df3ce60 100644 --- a/crates/apollo_protobuf/src/consensus.rs +++ b/crates/apollo_protobuf/src/consensus.rs @@ -14,6 +14,9 @@ use starknet_api::data_availability::L1DataAvailabilityMode; use starknet_api::hash::StarkHash; use crate::converters::ProtobufConversionError; + +pub type Round = u32; + #[derive( Debug, Default, @@ -48,7 +51,7 @@ pub enum VoteType { pub struct Vote { pub vote_type: VoteType, pub height: BlockNumber, - pub round: u32, + pub round: Round, pub proposal_commitment: Option, pub voter: ContractAddress, } @@ -72,9 +75,9 @@ pub struct ProposalInit { /// The height of the consensus (block number). pub height: BlockNumber, /// The current round of the consensus. - pub round: u32, + pub round: Round, /// The last round that was valid. - pub valid_round: Option, + pub valid_round: Option, /// Address of the one who proposed the block. pub proposer: ContractAddress, }