Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions rs/consensus/src/consensus/share_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ use ic_consensus_utils::{
registry_version_at_height,
};
use ic_interfaces::messaging::MessageRouting;
use ic_logger::ReplicaLogger;
use ic_logger::{ReplicaLogger, debug, warn};
use ic_types::{
Height,
consensus::{
CatchUpContent, ConsensusMessage, ConsensusMessageHashable, FinalizationContent, HasHeight,
RandomTapeContent,
Block, CatchUpContent, CatchUpPackage, ConsensusMessage, ConsensusMessageHashable,
FinalizationContent, HasCommittee, HasHeight, RandomTapeContent,
},
crypto::Signed,
};
use std::{cmp::min, sync::Arc};

Expand Down Expand Up @@ -135,27 +134,22 @@ impl ShareAggregator {
let current_cup_height = pool.get_catch_up_height();

while start_block.height() > current_cup_height {
let height = start_block.height();
let shares = pool.get_catch_up_package_shares(height).map(|share| {
let block = pool
.get_block(&share.content.block, height)
.unwrap_or_else(|err| panic!("Block not found for {share:?}, error: {err:?}"));
Signed {
content: CatchUpContent::from_share_content(share.content, block.into_inner()),
signature: share.signature,
match self.aggregate_catch_up_package_shares_for_summary_block(pool, &start_block) {
Ok(Some(cup)) => return vec![ConsensusMessage::CatchUpPackage(cup)],
Ok(None) => {
debug!(
self.log,
"Not enough shares to be able to create a full CUP at height {}",
start_block.height()
)
}
Err(err) => {
warn!(
self.log,
"Encountered an error while aggregating CUP shares at height {}: {err}",
start_block.height()
);
}
});
let state_reader = pool.as_cache();
let dkg_id = active_high_threshold_nidkg_id(state_reader, height);
let result = aggregate(
&self.log,
self.membership.as_ref(),
self.crypto.as_aggregate(),
Box::new(|_| dkg_id.clone()),
shares,
);
if !result.is_empty() {
return to_messages(result);
}

let Some(block_from_last_interval) =
Expand All @@ -177,6 +171,41 @@ impl ShareAggregator {
}
Vec::new()
}

fn aggregate_catch_up_package_shares_for_summary_block(
&self,
pool: &PoolReader<'_>,
summary_block: &Block,
) -> Result<Option<CatchUpPackage>, String> {
let height = summary_block.height();
let shares = pool.get_catch_up_package_shares(height).collect::<Vec<_>>();

let threshold = self
.membership
.get_committee_threshold(height, CatchUpPackage::committee())
.map_err(|err| format!("Failed to get the committee threshold: {err:?}"))?;

if shares.len() < threshold {
return Ok(None);
}

let dkg_id = active_high_threshold_nidkg_id(pool.as_cache(), height)
.ok_or_else(|| String::from("Couldn't get the high dkg id"))?;
let cup_content =
CatchUpContent::from_share_content(shares[0].content.clone(), summary_block.clone());
let signatures = shares.iter().map(|share| &share.signature).collect();

let cup = self
.crypto
.aggregate(signatures, dkg_id)
.map_err(|err| format!("Failed to aggregate shares: {err}"))
.map(|signature| CatchUpPackage {
content: cup_content,
signature,
})?;

Ok(Some(cup))
}
}

fn to_messages<T: ConsensusMessageHashable>(artifacts: Vec<T>) -> Vec<ConsensusMessage> {
Expand Down
Loading