Skip to content

Commit fe45e45

Browse files
apollo_consensus: dynamic timeout (#10047)
* apollo_consensus: dynamic timeout * apollo_consensus: add limit to timeouts (#10048) * apollo_consensus: add limit to timeouts * apollo_consensus: split TimeoutsConfig into Base/Delta/Max (#10054)
1 parent 1aa9a2f commit fe45e45

File tree

10 files changed

+397
-94
lines changed

10 files changed

+397
-94
lines changed

config/papyrus/default_config.json

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,20 +109,50 @@
109109
"privacy": "Public",
110110
"value": 1.0
111111
},
112-
"consensus.dynamic_config.timeouts.precommit_timeout": {
113-
"description": "The timeout (seconds) for a precommit.",
112+
"consensus.dynamic_config.timeouts.precommit.base": {
113+
"description": "The base timeout (seconds).",
114114
"privacy": "Public",
115115
"value": 1.0
116116
},
117-
"consensus.dynamic_config.timeouts.prevote_timeout": {
118-
"description": "The timeout (seconds) for a prevote.",
117+
"consensus.dynamic_config.timeouts.precommit.delta": {
118+
"description": "The per-round timeout delta (seconds).",
119+
"privacy": "Public",
120+
"value": 0.5
121+
},
122+
"consensus.dynamic_config.timeouts.precommit.max": {
123+
"description": "The maximum timeout (seconds).",
124+
"privacy": "Public",
125+
"value": 5.0
126+
},
127+
"consensus.dynamic_config.timeouts.prevote.base": {
128+
"description": "The base timeout (seconds).",
129+
"privacy": "Public",
130+
"value": 0.3
131+
},
132+
"consensus.dynamic_config.timeouts.prevote.delta": {
133+
"description": "The per-round timeout delta (seconds).",
134+
"privacy": "Public",
135+
"value": 0.1
136+
},
137+
"consensus.dynamic_config.timeouts.prevote.max": {
138+
"description": "The maximum timeout (seconds).",
119139
"privacy": "Public",
120140
"value": 1.0
121141
},
122-
"consensus.dynamic_config.timeouts.proposal_timeout": {
123-
"description": "The timeout (seconds) for a proposal.",
142+
"consensus.dynamic_config.timeouts.proposal.base": {
143+
"description": "The base timeout (seconds).",
144+
"privacy": "Public",
145+
"value": 9.1
146+
},
147+
"consensus.dynamic_config.timeouts.proposal.delta": {
148+
"description": "The per-round timeout delta (seconds).",
149+
"privacy": "Public",
150+
"value": 0.0
151+
},
152+
"consensus.dynamic_config.timeouts.proposal.max": {
153+
"description": "The maximum timeout (seconds).",
124154
"privacy": "Public",
125-
"value": 3.0
155+
"value": 15.0
126156
},
127157
"consensus.dynamic_config.validator_id": {
128158
"description": "The validator id of the node.",

crates/apollo_consensus/src/manager_test.rs

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use apollo_consensus_config::config::{
88
ConsensusDynamicConfig,
99
ConsensusStaticConfig,
1010
FutureMsgLimitsConfig,
11+
Timeout,
1112
TimeoutsConfig,
1213
};
1314
use apollo_network::network_manager::test_utils::{
@@ -38,11 +39,26 @@ lazy_static! {
3839
static ref VALIDATOR_ID: ValidatorId = (DEFAULT_VALIDATOR_ID + 1).into();
3940
static ref VALIDATOR_ID_2: ValidatorId = (DEFAULT_VALIDATOR_ID + 2).into();
4041
static ref VALIDATOR_ID_3: ValidatorId = (DEFAULT_VALIDATOR_ID + 3).into();
41-
static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig {
42-
prevote_timeout: Duration::from_millis(100),
43-
precommit_timeout: Duration::from_millis(100),
44-
proposal_timeout: Duration::from_millis(100),
45-
};
42+
static ref TIMEOUTS: TimeoutsConfig = TimeoutsConfig::new(
43+
// proposal
44+
Timeout::new(
45+
Duration::from_millis(100),
46+
Duration::from_millis(10),
47+
Duration::from_millis(1000)
48+
),
49+
// prevote
50+
Timeout::new(
51+
Duration::from_millis(100),
52+
Duration::from_millis(10),
53+
Duration::from_millis(500)
54+
),
55+
// precommit
56+
Timeout::new(
57+
Duration::from_millis(100),
58+
Duration::from_millis(10),
59+
Duration::from_millis(500)
60+
)
61+
);
4662
}
4763

4864
const CHANNEL_SIZE: usize = 10;

crates/apollo_consensus/src/single_height_consensus.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl SingleHeightConsensus {
214214
warn!("Round {} already has a proposal, ignoring", init.round);
215215
return Ok(ShcReturn::Tasks(Vec::new()));
216216
};
217-
let timeout = self.timeouts.proposal_timeout;
217+
let timeout = self.timeouts.get_proposal_timeout(init.round);
218218
info!(
219219
"Accepting {init:?}. node_round: {}, timeout: {timeout:?}",
220220
self.state_machine.round()
@@ -242,7 +242,7 @@ impl SingleHeightConsensus {
242242
| StateMachineEvent::TimeoutPrecommit(_round) => {
243243
self.handle_timeout(context, event).await
244244
}
245-
StateMachineEvent::Prevote(proposal_id, round) => {
245+
StateMachineEvent::Prevote(_proposal_id, round) => {
246246
let Some(last_vote) = &self.last_prevote else {
247247
return Err(ConsensusError::InternalInconsistency(
248248
"No prevote to send".to_string(),
@@ -255,11 +255,11 @@ impl SingleHeightConsensus {
255255
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
256256
context.broadcast(last_vote.clone()).await?;
257257
Ok(ShcReturn::Tasks(vec![ShcTask::Prevote(
258-
self.timeouts.prevote_timeout,
259-
StateMachineEvent::Prevote(proposal_id, round),
258+
self.timeouts.get_prevote_timeout(0),
259+
event,
260260
)]))
261261
}
262-
StateMachineEvent::Precommit(proposal_id, round) => {
262+
StateMachineEvent::Precommit(_proposal_id, round) => {
263263
let Some(last_vote) = &self.last_precommit else {
264264
return Err(ConsensusError::InternalInconsistency(
265265
"No precommit to send".to_string(),
@@ -272,8 +272,8 @@ impl SingleHeightConsensus {
272272
trace_every_n_sec!(REBROADCAST_LOG_PERIOD_SECS, "Rebroadcasting {last_vote:?}");
273273
context.broadcast(last_vote.clone()).await?;
274274
Ok(ShcReturn::Tasks(vec![ShcTask::Precommit(
275-
self.timeouts.precommit_timeout,
276-
StateMachineEvent::Precommit(proposal_id, round),
275+
self.timeouts.get_precommit_timeout(0),
276+
event,
277277
)]))
278278
}
279279
StateMachineEvent::Proposal(proposal_id, round, valid_round) => {
@@ -301,10 +301,7 @@ impl SingleHeightConsensus {
301301
old.is_some_and(|p| p.is_none()),
302302
"Proposal entry for round {round} should exist and be empty: {old:?}"
303303
);
304-
let sm_events = self.state_machine.handle_event(
305-
StateMachineEvent::Proposal(proposal_id, round, valid_round),
306-
&leader_fn,
307-
);
304+
let sm_events = self.state_machine.handle_event(event, &leader_fn);
308305
self.handle_state_machine_events(context, sm_events).await
309306
}
310307
StateMachineEvent::GetProposal(proposal_id, round) => {
@@ -321,9 +318,7 @@ impl SingleHeightConsensus {
321318
debug!(%round, proposal_commitment = ?proposal_id, "Built proposal.");
322319
let leader_fn =
323320
|round: Round| -> ValidatorId { context.proposer(self.height, round) };
324-
let sm_events = self
325-
.state_machine
326-
.handle_event(StateMachineEvent::GetProposal(proposal_id, round), &leader_fn);
321+
let sm_events = self.state_machine.handle_event(event, &leader_fn);
327322
self.handle_state_machine_events(context, sm_events).await
328323
}
329324
_ => unimplemented!("Unexpected event: {:?}", event),
@@ -434,14 +429,23 @@ impl SingleHeightConsensus {
434429
.await?,
435430
);
436431
}
437-
StateMachineEvent::TimeoutPropose(_) => {
438-
ret_val.push(ShcTask::TimeoutPropose(self.timeouts.proposal_timeout, event));
432+
StateMachineEvent::TimeoutPropose(round) => {
433+
ret_val.push(ShcTask::TimeoutPropose(
434+
self.timeouts.get_proposal_timeout(round),
435+
event,
436+
));
439437
}
440-
StateMachineEvent::TimeoutPrevote(_) => {
441-
ret_val.push(ShcTask::TimeoutPrevote(self.timeouts.prevote_timeout, event));
438+
StateMachineEvent::TimeoutPrevote(round) => {
439+
ret_val.push(ShcTask::TimeoutPrevote(
440+
self.timeouts.get_prevote_timeout(round),
441+
event,
442+
));
442443
}
443-
StateMachineEvent::TimeoutPrecommit(_) => {
444-
ret_val.push(ShcTask::TimeoutPrecommit(self.timeouts.precommit_timeout, event));
444+
StateMachineEvent::TimeoutPrecommit(round) => {
445+
ret_val.push(ShcTask::TimeoutPrecommit(
446+
self.timeouts.get_precommit_timeout(round),
447+
event,
448+
));
445449
}
446450
}
447451
}
@@ -466,7 +470,12 @@ impl SingleHeightConsensus {
466470
let init =
467471
ProposalInit { height: self.height, round, proposer: self.id, valid_round: None };
468472
CONSENSUS_BUILD_PROPOSAL_TOTAL.increment(1);
469-
let fin_receiver = context.build_proposal(init, self.timeouts.proposal_timeout).await;
473+
// TODO(Asmaa): Reconsider: we should keep the builder's timeout bounded independently of
474+
// the consensus proposal timeout. We currently use the base (round 0) proposal
475+
// timeout for building to avoid giving the Batcher more time when proposal time is
476+
// extended for consensus.
477+
let build_timeout = self.timeouts.get_proposal_timeout(0);
478+
let fin_receiver = context.build_proposal(init, build_timeout).await;
470479
vec![ShcTask::BuildProposal(round, fin_receiver)]
471480
}
472481

@@ -511,20 +520,19 @@ impl SingleHeightConsensus {
511520
round: Round,
512521
vote_type: VoteType,
513522
) -> Result<Vec<ShcTask>, ConsensusError> {
523+
let prevote_timeout = self.timeouts.get_prevote_timeout(round);
524+
let precommit_timeout = self.timeouts.get_precommit_timeout(round);
514525
let (votes, last_vote, task) = match vote_type {
515526
VoteType::Prevote => (
516527
&mut self.prevotes,
517528
&mut self.last_prevote,
518-
ShcTask::Prevote(
519-
self.timeouts.prevote_timeout,
520-
StateMachineEvent::Prevote(proposal_id, round),
521-
),
529+
ShcTask::Prevote(prevote_timeout, StateMachineEvent::Prevote(proposal_id, round)),
522530
),
523531
VoteType::Precommit => (
524532
&mut self.precommits,
525533
&mut self.last_precommit,
526534
ShcTask::Precommit(
527-
self.timeouts.precommit_timeout,
535+
precommit_timeout,
528536
StateMachineEvent::Precommit(proposal_id, round),
529537
),
530538
),

crates/apollo_consensus/src/single_height_consensus_test.rs

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use apollo_consensus_config::config::TimeoutsConfig;
1+
use std::time::Duration;
2+
3+
use apollo_consensus_config::config::{Timeout, TimeoutsConfig};
24
use apollo_protobuf::consensus::{ProposalFin, ProposalInit, Vote, DEFAULT_VALIDATOR_ID};
35
use futures::channel::{mpsc, oneshot};
46
use futures::SinkExt;
@@ -41,28 +43,29 @@ fn get_proposal_init_for_height(height: BlockNumber) -> ProposalInit {
4143
}
4244

4345
fn prevote_task(block_felt: Option<Felt>, round: u32) -> ShcTask {
46+
let duration = TIMEOUTS.get_prevote_timeout(round);
4447
ShcTask::Prevote(
45-
TIMEOUTS.prevote_timeout,
48+
duration,
4649
StateMachineEvent::Prevote(block_felt.map(ProposalCommitment), round),
4750
)
4851
}
4952

5053
fn precommit_task(block_felt: Option<Felt>, round: u32) -> ShcTask {
54+
let duration = TIMEOUTS.get_precommit_timeout(round);
5155
ShcTask::Precommit(
52-
TIMEOUTS.precommit_timeout,
56+
duration,
5357
StateMachineEvent::Precommit(block_felt.map(ProposalCommitment), round),
5458
)
5559
}
5660

5761
fn timeout_prevote_task(round: u32) -> ShcTask {
58-
ShcTask::TimeoutPrevote(TIMEOUTS.prevote_timeout, StateMachineEvent::TimeoutPrevote(round))
62+
let duration = TIMEOUTS.get_prevote_timeout(round);
63+
ShcTask::TimeoutPrevote(duration, StateMachineEvent::TimeoutPrevote(round))
5964
}
6065

6166
fn timeout_precommit_task(round: u32) -> ShcTask {
62-
ShcTask::TimeoutPrecommit(
63-
TIMEOUTS.precommit_timeout,
64-
StateMachineEvent::TimeoutPrecommit(round),
65-
)
67+
let duration = TIMEOUTS.get_precommit_timeout(round);
68+
ShcTask::TimeoutPrecommit(duration, StateMachineEvent::TimeoutPrecommit(round))
6669
}
6770

6871
async fn handle_proposal(
@@ -445,3 +448,98 @@ async fn repropose() {
445448
assert_eq!(decision.block, BLOCK.id);
446449
assert!(decision.precommits.into_iter().all(|item| precommits.contains(&item)));
447450
}
451+
452+
#[tokio::test]
453+
async fn shc_applies_proposal_timeouts_across_rounds() {
454+
let mut context = MockTestContext::new();
455+
456+
let timeouts = TimeoutsConfig::new(
457+
// Proposal timeouts: base=100ms, delta=60ms, max=200ms.
458+
Timeout::new(
459+
Duration::from_millis(100),
460+
Duration::from_millis(60),
461+
Duration::from_millis(200),
462+
),
463+
// unused prevote and precommit timeouts.
464+
Timeout::new(
465+
Duration::from_millis(100),
466+
Duration::from_millis(10),
467+
Duration::from_millis(150),
468+
),
469+
Timeout::new(
470+
Duration::from_millis(100),
471+
Duration::from_millis(10),
472+
Duration::from_millis(150),
473+
),
474+
);
475+
476+
let mut shc = SingleHeightConsensus::new(
477+
BlockNumber(0),
478+
false,
479+
*PROPOSER_ID,
480+
VALIDATORS.to_vec(),
481+
QuorumType::Byzantine,
482+
timeouts.clone(),
483+
);
484+
// context expectations.
485+
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
486+
context.expect_set_height_and_round().returning(move |_, _| ());
487+
488+
// Round 0 validate: timeout = 100ms.
489+
let expected_validate_timeout_r0 = Duration::from_millis(100);
490+
context
491+
.expect_validate_proposal()
492+
.times(1)
493+
.withf(move |init, timeout, _| init.round == 0 && *timeout == expected_validate_timeout_r0)
494+
.returning(move |_, _, _| {
495+
let (tx, rx) = oneshot::channel();
496+
tx.send(BLOCK.id).ok();
497+
rx
498+
});
499+
500+
let (mut content_tx, content_rx) = mpsc::channel(CHANNEL_SIZE);
501+
content_tx.send(TestProposalPart::Init(ProposalInit::default())).await.unwrap();
502+
let _ = shc.handle_proposal(&mut context, ProposalInit::default(), content_rx).await.unwrap();
503+
504+
// Round 1 validate: timeout = 160ms.
505+
let expected_validate_timeout_r1 = Duration::from_millis(160);
506+
context
507+
.expect_validate_proposal()
508+
.times(1)
509+
.withf(move |init, timeout, _| init.round == 1 && *timeout == expected_validate_timeout_r1)
510+
.returning(move |_, _, _| {
511+
let (tx, rx) = oneshot::channel();
512+
tx.send(BLOCK.id).ok();
513+
rx
514+
});
515+
let (mut content_tx2, content_rx2) = mpsc::channel(CHANNEL_SIZE);
516+
let round1_init = ProposalInit {
517+
height: BlockNumber(0),
518+
round: 1,
519+
proposer: *PROPOSER_ID,
520+
..Default::default()
521+
};
522+
content_tx2.send(TestProposalPart::Init(round1_init)).await.unwrap();
523+
let _ = shc.handle_proposal(&mut context, round1_init, content_rx2).await.unwrap();
524+
525+
// Round 2 validate: timeout = min(100 + 2*60, 200) = 200ms (capped).
526+
let expected_validate_timeout_r2 = Duration::from_millis(200);
527+
context
528+
.expect_validate_proposal()
529+
.times(1)
530+
.withf(move |init, timeout, _| init.round == 2 && *timeout == expected_validate_timeout_r2)
531+
.returning(move |_, _, _| {
532+
let (tx, rx) = oneshot::channel();
533+
tx.send(BLOCK.id).ok();
534+
rx
535+
});
536+
let (mut content_tx3, content_rx3) = mpsc::channel(CHANNEL_SIZE);
537+
let round2_init = ProposalInit {
538+
height: BlockNumber(0),
539+
round: 2,
540+
proposer: *PROPOSER_ID,
541+
..Default::default()
542+
};
543+
content_tx3.send(TestProposalPart::Init(round2_init)).await.unwrap();
544+
let _ = shc.handle_proposal(&mut context, round2_init, content_rx3).await.unwrap();
545+
}

0 commit comments

Comments
 (0)