88#[ path = "single_height_consensus_test.rs" ]
99mod single_height_consensus_test;
1010
11- use std:: collections:: hash_map:: Entry ;
12- use std:: collections:: { HashMap , VecDeque } ;
11+ use std:: collections:: { HashSet , VecDeque } ;
1312use std:: time:: Duration ;
1413
1514const REBROADCAST_LOG_PERIOD_SECS : u64 = 10 ;
@@ -136,7 +135,8 @@ pub(crate) struct SingleHeightConsensus {
136135 validators : Vec < ValidatorId > ,
137136 timeouts : TimeoutsConfig ,
138137 state_machine : StateMachine ,
139- proposals : HashMap < Round , Option < ProposalCommitment > > ,
138+ // Tracks rounds for which we started validating a proposal to avoid duplicate validations.
139+ pending_validation_rounds : HashSet < Round > ,
140140 last_prevote : Option < Vote > ,
141141 last_precommit : Option < Vote > ,
142142}
@@ -158,7 +158,7 @@ impl SingleHeightConsensus {
158158 validators,
159159 timeouts,
160160 state_machine,
161- proposals : HashMap :: new ( ) ,
161+ pending_validation_rounds : HashSet :: new ( ) ,
162162 last_prevote : None ,
163163 last_precommit : None ,
164164 }
@@ -204,20 +204,25 @@ impl SingleHeightConsensus {
204204 warn ! ( "Invalid proposer: expected {:?}, got {:?}" , proposer_id, init. proposer) ;
205205 return Ok ( ShcReturn :: Tasks ( Vec :: new ( ) ) ) ;
206206 }
207- let Entry :: Vacant ( proposal_entry) = self . proposals . entry ( init. round ) else {
208- warn ! ( "Round {} already has a proposal, ignoring" , init. round) ;
207+ // Avoid duplicate validations:
208+ // - If SM already has an entry for this round, a (re)proposal was already recorded.
209+ // - If we already started validating this round, ignore repeats.
210+ if self . state_machine . proposals_ref ( ) . get ( & init. round ) . is_some ( )
211+ || self . pending_validation_rounds . contains ( & init. round )
212+ {
213+ warn ! ( "Round {} already handled a proposal, ignoring" , init. round) ;
209214 return Ok ( ShcReturn :: Tasks ( Vec :: new ( ) ) ) ;
210- } ;
215+ }
211216 let timeout = self . timeouts . get_proposal_timeout ( init. round ) ;
212217 info ! (
213218 "Accepting {init:?}. node_round: {}, timeout: {timeout:?}" ,
214219 self . state_machine. round( )
215220 ) ;
216221 CONSENSUS_PROPOSALS_VALID_INIT . increment ( 1 ) ;
217222
218- // Since validating the proposal is non-blocking, we want to avoid validating the same round
219- // twice in parallel. This could be caused by a network repeat or a malicious spam attack .
220- proposal_entry . insert ( None ) ;
223+ // Since validating the proposal is non-blocking, avoid validating the same round twice in
224+ // parallel (e.g., due to repeats or spam) .
225+ self . pending_validation_rounds . insert ( init . round ) ;
221226 let block_receiver = context. validate_proposal ( init, timeout, p2p_messages_receiver) . await ;
222227 context. set_height_and_round ( height, self . state_machine . round ( ) ) . await ;
223228 Ok ( ShcReturn :: Tasks ( vec ! [ ShcTask :: ValidateProposal ( init, block_receiver) ] ) )
@@ -287,23 +292,21 @@ impl SingleHeightConsensus {
287292 CONSENSUS_PROPOSALS_INVALID . increment ( 1 ) ;
288293 }
289294
290- // Retaining the entry for this round prevents us from receiving another proposal on
291- // this round. While this prevents spam attacks it also prevents re-receiving after
292- // a network issue.
293- let old = self . proposals . insert ( round, proposal_id) ;
294- assert ! (
295- old. is_some_and( |p| p. is_none( ) ) ,
296- "Proposal entry for round {round} should exist and be empty: {old:?}"
297- ) ;
295+ // Validation for this round finished; clear the pending marker.
296+ self . pending_validation_rounds . remove ( & round) ;
298297 let sm_events = self . state_machine . handle_event ( event, & leader_fn) ;
299298 self . handle_state_machine_events ( context, sm_events) . await
300299 }
301300 StateMachineEvent :: GetProposal ( proposal_id, round) => {
302301 if proposal_id. is_none ( ) {
303302 CONSENSUS_BUILD_PROPOSAL_FAILED . increment ( 1 ) ;
304303 }
305- let old = self . proposals . insert ( round, proposal_id) ;
306- assert ! ( old. is_none( ) , "There should be no entry for round {round} when proposing" ) ;
304+ // Ensure SM has no proposal recorded yet for this round when proposing.
305+ assert ! (
306+ self . state_machine. proposals_ref( ) . get( & round) . is_none( ) ,
307+ "There should be no entry for round {round} when proposing"
308+ ) ;
309+
307310 assert_eq ! (
308311 round,
309312 self . state_machine. round( ) ,
@@ -472,15 +475,13 @@ impl SingleHeightConsensus {
472475
473476 // Make sure there is an existing proposal for the valid round and it matches the proposal
474477 // ID.
475- let existing = self . proposals . get ( & valid_round) . and_then ( |& inner| inner ) ;
478+ let existing = self . state_machine . proposals_ref ( ) . get ( & valid_round) . and_then ( |( id , _ ) | * id ) ;
476479 assert ! (
477480 existing. is_some_and( |id| id == proposal_id) ,
478481 "A proposal with ID {proposal_id:?} should exist for valid_round: {valid_round}. \
479482 Found: {existing:?}",
480483 ) ;
481484
482- let old = self . proposals . insert ( round, Some ( proposal_id) ) ;
483- assert ! ( old. is_none( ) , "There should be no proposal for round {round}." ) ;
484485 let init = ProposalInit {
485486 height : self . state_machine . height ( ) ,
486487 round,
@@ -544,14 +545,11 @@ impl SingleHeightConsensus {
544545 ) )
545546 } ;
546547 let block = self
547- . proposals
548- . remove ( & round)
549- . ok_or_else ( || invalid_decision ( "No proposal entry for this round" . to_string ( ) ) ) ?
550- . ok_or_else ( || {
551- invalid_decision (
552- "Proposal is invalid or validations haven't yet completed" . to_string ( ) ,
553- )
554- } ) ?;
548+ . state_machine
549+ . proposals_ref ( )
550+ . get ( & round)
551+ . and_then ( |( id, _) | * id)
552+ . ok_or_else ( || invalid_decision ( "No proposal entry for this round" . to_string ( ) ) ) ?;
555553 if block != proposal_id {
556554 return Err ( invalid_decision ( format ! (
557555 "StateMachine proposal commitment should match the stored block. Shc.block_id: \
0 commit comments