Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 10 additions & 84 deletions crates/apollo_central_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ impl<
| StateSyncError::CentralSourceError(_)
| StateSyncError::PendingSourceError(_)
| StateSyncError::BaseLayerSourceError(_)
| StateSyncError::ParentBlockHashMismatch { .. }
| StateSyncError::BaseLayerHashMismatch { .. }
| StateSyncError::ClassManagerClientError(_)
| StateSyncError::BaseLayerBlockWithoutMatchingHeader { .. }
| StateSyncError::JoinError(_) => true,
StateSyncError::SequencerPubKeyChanged { .. } => false,
StateSyncError::SequencerPubKeyChanged { .. }
| StateSyncError::ParentBlockHashMismatch { .. } => false,
}
}
}
Expand Down Expand Up @@ -253,14 +253,12 @@ impl<
}

// Sync until encountering an error:
// 1. If needed, revert blocks from the end of the chain.
// 2. Create infinite block and state diff streams to fetch data from the central source.
// 3. Fetch data from the streams with unblocking wait while there is no new data.
// 1. Create infinite block and state diff streams to fetch data from the central source.
// 2. Fetch data from the streams with unblocking wait while there is no new data.
async fn sync_while_ok(&mut self) -> StateSyncResult {
if self.config.verify_blocks {
self.track_sequencer_public_key_changes().await?;
}
self.handle_block_reverts().await?;
let block_stream = stream_new_blocks(
self.reader.clone(),
self.central_source.clone(),
Expand Down Expand Up @@ -384,8 +382,8 @@ impl<
block: Block,
signature: BlockSignature,
) -> StateSyncResult {
// Assuming the central source is trusted, detect reverts by comparing the incoming block's
// parent hash to the current hash.
// Assuming the central source is trusted, checks incoming block's parent hash against the
// parent hash stored in the storage.
self.verify_parent_block_hash(block_number, &block)?;

debug!("Storing block number: {block_number}, block header: {:?}", block.header);
Expand Down Expand Up @@ -635,18 +633,13 @@ impl<
}

#[instrument(skip(self), level = "debug", err)]
// In case of a mismatch between the base layer and l2, an error will be returned, then the
// sync will revert blocks if needed based on the l2 central source. This approach works as long
// as l2 is trusted so all the reverts can be detect by using it.
async fn store_base_layer_block(
&mut self,
block_number: BlockNumber,
block_hash: BlockHash,
) -> StateSyncResult {
self.perform_storage_writes(move |writer| {
let txn = writer.begin_rw_txn()?;
// Missing header can be because of a base layer reorg, the matching header may be
// reverted.
let expected_hash = txn
.get_block_header(block_number)?
.ok_or(StateSyncError::BaseLayerBlockWithoutMatchingHeader { block_number })?
Expand Down Expand Up @@ -692,11 +685,11 @@ impl<
.block_hash;

if prev_hash != block.header.block_header_without_hash.parent_hash {
// A revert detected, log and restart sync loop.
// Block parent hash mismatch, log and restart sync loop.
warn!(
"Detected revert while processing block {}. Parent hash of the incoming block is \
{}, current block hash is {}.",
block_number, block.header.block_header_without_hash.parent_hash, prev_hash
"Parent hash of the incoming block {block_number} is {}, current block hash is \
{prev_hash}.",
block.header.block_header_without_hash.parent_hash
);
CENTRAL_SYNC_FORKS_FROM_FEEDER.increment(1);
return Err(StateSyncError::ParentBlockHashMismatch {
Expand All @@ -709,73 +702,6 @@ impl<
Ok(())
}

// Reverts data if needed.
async fn handle_block_reverts(&mut self) -> Result<(), StateSyncError> {
debug!("Handling block reverts.");
let header_marker = self.reader.begin_ro_txn()?.get_header_marker()?;

// Revert last blocks if needed.
let mut last_block_in_storage = header_marker.prev();
while let Some(block_number) = last_block_in_storage {
if self.should_revert_block(block_number).await? {
self.revert_block(block_number).await?;
last_block_in_storage = block_number.prev();
} else {
break;
}
}
Ok(())
}

// TODO(dan): update necessary metrics.
// Deletes the block data from the storage.
#[allow(clippy::expect_fun_call)]
#[instrument(skip(self), level = "debug", err)]
async fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult {
debug!("Reverting block.");

self.perform_storage_writes(move |writer| {
let mut txn = writer.begin_rw_txn()?;
txn = txn.try_revert_base_layer_marker(block_number)?;
let res = txn.revert_header(block_number)?;
txn = res.0;
let mut reverted_block_hash: Option<BlockHash> = None;
if let Some(header) = res.1 {
reverted_block_hash = Some(header.block_hash);

let res = txn.revert_body(block_number)?;
txn = res.0;

let res = txn.revert_state_diff(block_number)?;
txn = res.0;
}

txn.commit()?;
if let Some(hash) = reverted_block_hash {
info!(%hash, %block_number, "Reverted block.");
}
Ok(())
})
.await
}

/// Checks if centrals block hash at the block number is different from ours (or doesn't exist).
/// If so, a revert is required.
async fn should_revert_block(&self, block_number: BlockNumber) -> Result<bool, StateSyncError> {
if let Some(central_block_hash) = self.central_source.get_block_hash(block_number).await? {
let storage_block_header =
self.reader.begin_ro_txn()?.get_block_header(block_number)?;

match storage_block_header {
Some(block_header) => Ok(block_header.block_hash != central_block_hash),
None => Ok(false),
}
} else {
// Block number doesn't exist in central, revert.
Ok(true)
}
}

async fn perform_storage_writes<
F: FnOnce(&mut StorageWriter) -> Result<(), StateSyncError> + Send + 'static,
>(
Expand Down
Loading
Loading