Skip to content

Commit a109ad6

Browse files
apollo_consensus: limit messages from future heights or rounds (#8683) (#10239)
1 parent d12ead4 commit a109ad6

File tree

3 files changed

+230
-18
lines changed

3 files changed

+230
-18
lines changed

crates/apollo_consensus/src/manager.rs

Lines changed: 107 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use futures::channel::mpsc;
2222
use futures::stream::FuturesUnordered;
2323
use futures::{FutureExt, StreamExt};
2424
use starknet_api::block::BlockNumber;
25-
use tracing::{debug, error, info, instrument, trace};
25+
use tracing::{debug, error, info, instrument, trace, warn};
2626

2727
use crate::metrics::{
2828
register_metrics,
@@ -407,27 +407,36 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
407407

408408
match proposal_init.height.cmp(&height) {
409409
std::cmp::Ordering::Greater => {
410-
debug!("Received a proposal for a future height. {:?}", proposal_init);
411-
// Note: new proposals with the same height/round will be ignored.
412-
//
413-
// TODO(matan): This only work for trusted peers. In the case of possibly malicious
414-
// peers this is a possible DoS attack (malicious users can insert
415-
// invalid/bad/malicious proposals before "good" nodes can propose).
416-
//
417-
// When moving to version 1.0 make sure this is addressed.
418-
self.cached_proposals
419-
.entry(proposal_init.height.0)
420-
.or_default()
421-
.entry(proposal_init.round)
422-
.or_insert((proposal_init, content_receiver));
410+
if self.should_cache_proposal(&height, 0, &proposal_init) {
411+
debug!("Received a proposal for a future height. {:?}", proposal_init);
412+
// Note: new proposals with the same height/round will be ignored.
413+
//
414+
// TODO(matan): This only work for trusted peers. In the case of possibly
415+
// malicious peers this is a possible DoS attack (malicious
416+
// users can insert invalid/bad/malicious proposals before
417+
// "good" nodes can propose).
418+
//
419+
// When moving to version 1.0 make sure this is addressed.
420+
self.cached_proposals
421+
.entry(proposal_init.height.0)
422+
.or_default()
423+
.entry(proposal_init.round)
424+
.or_insert((proposal_init, content_receiver));
425+
}
423426
Ok(ShcReturn::Tasks(Vec::new()))
424427
}
425428
std::cmp::Ordering::Less => {
426429
trace!("Drop proposal from past height. {:?}", proposal_init);
427430
Ok(ShcReturn::Tasks(Vec::new()))
428431
}
429432
std::cmp::Ordering::Equal => match shc {
430-
Some(shc) => shc.handle_proposal(context, proposal_init, content_receiver).await,
433+
Some(shc) => {
434+
if self.should_cache_proposal(&height, shc.current_round(), &proposal_init) {
435+
shc.handle_proposal(context, proposal_init, content_receiver).await
436+
} else {
437+
Ok(ShcReturn::Tasks(Vec::new()))
438+
}
439+
}
431440
None => {
432441
trace!("Drop proposal from just completed height. {:?}", proposal_init);
433442
Ok(ShcReturn::Tasks(Vec::new()))
@@ -481,16 +490,24 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
481490
// 2. Parallel proposals - we may send/receive a proposal for (H+1, 0).
482491
match message.height.cmp(&height.0) {
483492
std::cmp::Ordering::Greater => {
484-
trace!("Cache message for a future height. {:?}", message);
485-
self.future_votes.entry(message.height).or_default().push(message);
493+
if self.should_cache_vote(&height, 0, &message) {
494+
trace!("Cache message for a future height. {:?}", message);
495+
self.future_votes.entry(message.height).or_default().push(message);
496+
}
486497
Ok(ShcReturn::Tasks(Vec::new()))
487498
}
488499
std::cmp::Ordering::Less => {
489500
trace!("Drop message from past height. {:?}", message);
490501
Ok(ShcReturn::Tasks(Vec::new()))
491502
}
492503
std::cmp::Ordering::Equal => match shc {
493-
Some(shc) => shc.handle_vote(context, message).await,
504+
Some(shc) => {
505+
if self.should_cache_vote(&height, shc.current_round(), &message) {
506+
shc.handle_vote(context, message).await
507+
} else {
508+
Ok(ShcReturn::Tasks(Vec::new()))
509+
}
510+
}
494511
None => {
495512
trace!("Drop message from just completed height. {:?}", message);
496513
Ok(ShcReturn::Tasks(Vec::new()))
@@ -548,4 +565,76 @@ impl<ContextT: ConsensusContext> MultiHeightManager<ContextT> {
548565
let max_cached_block_number = self.cached_proposals.keys().max().unwrap_or(&height.0);
549566
CONSENSUS_MAX_CACHED_BLOCK_NUMBER.set_lossy(*max_cached_block_number);
550567
}
568+
569+
fn should_cache_msg(
570+
&self,
571+
current_height: &BlockNumber,
572+
current_round: u32,
573+
msg_height: u64,
574+
msg_round: u32,
575+
msg_description: &str,
576+
) -> bool {
577+
let height_diff = msg_height.saturating_sub(current_height.0);
578+
let round_diff = msg_round.saturating_sub(current_round);
579+
let mut should_cache = true;
580+
581+
// Check height limits first
582+
if height_diff > self.consensus_config.static_config.future_height_limit.into() {
583+
should_cache = false;
584+
}
585+
586+
// Check round limits based on height
587+
if height_diff == 0 {
588+
// For current height, check against current round + future_round_limit
589+
if round_diff > self.consensus_config.static_config.future_round_limit {
590+
should_cache = false;
591+
}
592+
} else {
593+
// For future heights, check absolute round limit
594+
if msg_round > self.consensus_config.static_config.future_height_round_limit {
595+
should_cache = false;
596+
}
597+
}
598+
599+
if !should_cache {
600+
warn!(
601+
"Dropping {} for height={} round={} when current_height={} current_round={} - \
602+
limits: future_height={}, future_height_round={}, future_round={}",
603+
msg_description,
604+
msg_height,
605+
msg_round,
606+
current_height.0,
607+
current_round,
608+
self.consensus_config.static_config.future_height_limit,
609+
self.consensus_config.static_config.future_height_round_limit,
610+
self.consensus_config.static_config.future_round_limit
611+
);
612+
}
613+
614+
should_cache
615+
}
616+
617+
fn should_cache_proposal(
618+
&self,
619+
current_height: &BlockNumber,
620+
current_round: u32,
621+
proposal: &ProposalInit,
622+
) -> bool {
623+
self.should_cache_msg(
624+
current_height,
625+
current_round,
626+
proposal.height.0,
627+
proposal.round,
628+
"proposal",
629+
)
630+
}
631+
632+
fn should_cache_vote(
633+
&self,
634+
current_height: &BlockNumber,
635+
current_round: u32,
636+
vote: &Vote,
637+
) -> bool {
638+
self.should_cache_msg(current_height, current_round, vote.height, vote.round, "vote")
639+
}
551640
}

crates/apollo_consensus/src/manager_test.rs

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,125 @@ async fn timely_message_handling(consensus_config: ConsensusConfig) {
335335
assert!(vote_sender.send((vote.clone(), metadata.clone())).now_or_never().is_some());
336336
}
337337

338+
#[rstest]
339+
#[tokio::test]
340+
async fn future_height_limit_caching_and_dropping(mut consensus_config: ConsensusConfig) {
341+
// Use very low limit - only cache 1 height ahead with round 0.
342+
consensus_config.static_config.future_height_limit = 1;
343+
consensus_config.static_config.future_round_limit = 0;
344+
consensus_config.static_config.future_height_round_limit = 0;
345+
346+
let TestSubscriberChannels { mock_network, subscriber_channels } =
347+
mock_register_broadcast_topic().unwrap();
348+
let mut sender = mock_network.broadcasted_messages_sender;
349+
350+
let (mut proposal_receiver_sender, mut proposal_receiver_receiver) =
351+
mpsc::channel(CHANNEL_SIZE);
352+
353+
// Send proposal and votes for height 2 (should be dropped when processing height 0).
354+
send_proposal(
355+
&mut proposal_receiver_sender,
356+
vec![TestProposalPart::Init(proposal_init(2, 0, *PROPOSER_ID))],
357+
)
358+
.await;
359+
send(&mut sender, prevote(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
360+
send(&mut sender, precommit(Some(Felt::TWO), 2, 0, *PROPOSER_ID)).await;
361+
362+
// Send proposal and votes for height 1 (should be cached when processing height 0).
363+
send_proposal(
364+
&mut proposal_receiver_sender,
365+
vec![TestProposalPart::Init(proposal_init(1, 0, *PROPOSER_ID))],
366+
)
367+
.await;
368+
send(&mut sender, prevote(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
369+
send(&mut sender, precommit(Some(Felt::ONE), 1, 0, *PROPOSER_ID)).await;
370+
371+
// Send proposal and votes for height 0 (current height - needed to reach consensus).
372+
send_proposal(
373+
&mut proposal_receiver_sender,
374+
vec![TestProposalPart::Init(proposal_init(0, 0, *PROPOSER_ID))],
375+
)
376+
.await;
377+
send(&mut sender, prevote(Some(Felt::ZERO), 0, 0, *PROPOSER_ID)).await;
378+
send(&mut sender, precommit(Some(Felt::ZERO), 0, 0, *PROPOSER_ID)).await;
379+
380+
let mut context = MockTestContext::new();
381+
context.expect_try_sync().returning(|_| false);
382+
expect_validate_proposal(&mut context, Felt::ZERO, 1); // Height 0 validation
383+
expect_validate_proposal(&mut context, Felt::ONE, 1); // Height 1 validation
384+
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
385+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
386+
context.expect_set_height_and_round().returning(move |_, _| ());
387+
// Set up coordination to detect when node votes Nil for height 2 (indicating proposal was
388+
// dropped, so the node didn't received the proposal and votes Nil).
389+
let (height2_nil_vote_trigger, height2_nil_vote_wait) = oneshot::channel();
390+
context
391+
.expect_broadcast()
392+
.withf(move |vote: &Vote| vote.height == 2 && vote.proposal_commitment.is_none())
393+
.times(1)
394+
.return_once(move |_| {
395+
height2_nil_vote_trigger.send(()).unwrap();
396+
Ok(())
397+
});
398+
// Handle all other broadcasts normally.
399+
context.expect_broadcast().returning(move |_| Ok(()));
400+
401+
let mut manager = MultiHeightManager::new(consensus_config, QuorumType::Byzantine);
402+
let mut subscriber_channels = subscriber_channels.into();
403+
404+
// Run height 0 - should drop height 2 messages, cache height 1 messages, and reach consensus.
405+
let decision = manager
406+
.run_height(
407+
&mut context,
408+
BlockNumber(0),
409+
false,
410+
&mut subscriber_channels,
411+
&mut proposal_receiver_receiver,
412+
)
413+
.await
414+
.unwrap();
415+
assert_decision(decision, Felt::ZERO);
416+
417+
// Run height 1 - should succeed using cached proposal.
418+
let decision = manager
419+
.run_height(
420+
&mut context,
421+
BlockNumber(1),
422+
false,
423+
&mut subscriber_channels,
424+
&mut proposal_receiver_receiver,
425+
)
426+
.await
427+
.unwrap();
428+
assert_decision(decision, Felt::ONE);
429+
430+
// Run height 2 in background - shouldn't reach consensus because proposal was dropped.
431+
let manager_handle = tokio::spawn(async move {
432+
manager
433+
.run_height(
434+
&mut context,
435+
BlockNumber(2),
436+
false,
437+
&mut subscriber_channels,
438+
&mut proposal_receiver_receiver,
439+
)
440+
.await
441+
});
442+
443+
// Race between consensus completing and height2_nil_vote_trigger being fired.
444+
tokio::select! {
445+
_ = height2_nil_vote_wait => {
446+
// SUCCESS: height2_nil_vote_trigger was fired - this means the proposal was dropped as
447+
// expected, and the node didn't receive the proposal and votes Nil.
448+
}
449+
consensus_result = manager_handle => {
450+
panic!("FAIL: Node should not reach consensus. {consensus_result:?}");
451+
}
452+
}
453+
}
454+
455+
// TODO(Asmaa): Add test for current height future round limit.
456+
338457
#[rstest]
339458
#[tokio::test]
340459
async fn run_consensus_dynamic_client_updates_validator_between_heights(

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,10 @@ impl SingleHeightConsensus {
172172
}
173173
}
174174

175+
pub(crate) fn current_round(&self) -> Round {
176+
self.state_machine.round()
177+
}
178+
175179
#[instrument(skip_all)]
176180
pub(crate) async fn start<ContextT: ConsensusContext>(
177181
&mut self,

0 commit comments

Comments
 (0)