diff --git a/crates/apollo_central_sync/src/lib.rs b/crates/apollo_central_sync/src/lib.rs index 7414439a229..b95e8e81fad 100644 --- a/crates/apollo_central_sync/src/lib.rs +++ b/crates/apollo_central_sync/src/lib.rs @@ -217,12 +217,12 @@ impl< | StateSyncError::CentralSourceError(_) | StateSyncError::PendingSourceError(_) | StateSyncError::BaseLayerSourceError(_) - | StateSyncError::ParentBlockHashMismatch { .. } | StateSyncError::BaseLayerHashMismatch { .. } | StateSyncError::ClassManagerClientError(_) | StateSyncError::BaseLayerBlockWithoutMatchingHeader { .. } | StateSyncError::JoinError(_) => true, - StateSyncError::SequencerPubKeyChanged { .. } => false, + StateSyncError::SequencerPubKeyChanged { .. } + | StateSyncError::ParentBlockHashMismatch { .. } => false, } } } @@ -253,14 +253,12 @@ impl< } // Sync until encountering an error: - // 1. If needed, revert blocks from the end of the chain. - // 2. Create infinite block and state diff streams to fetch data from the central source. - // 3. Fetch data from the streams with unblocking wait while there is no new data. + // 1. Create infinite block and state diff streams to fetch data from the central source. + // 2. Fetch data from the streams with unblocking wait while there is no new data. async fn sync_while_ok(&mut self) -> StateSyncResult { if self.config.verify_blocks { self.track_sequencer_public_key_changes().await?; } - self.handle_block_reverts().await?; let block_stream = stream_new_blocks( self.reader.clone(), self.central_source.clone(), @@ -384,8 +382,8 @@ impl< block: Block, signature: BlockSignature, ) -> StateSyncResult { - // Assuming the central source is trusted, detect reverts by comparing the incoming block's - // parent hash to the current hash. + // To prevent cases where central has forked under our feet, check incoming block's parent + // hash against the parent hash stored in the storage. self.verify_parent_block_hash(block_number, &block)?; debug!("Storing block number: {block_number}, block header: {:?}", block.header); @@ -635,9 +633,6 @@ impl< } #[instrument(skip(self), level = "debug", err)] - // In case of a mismatch between the base layer and l2, an error will be returned, then the - // sync will revert blocks if needed based on the l2 central source. This approach works as long - // as l2 is trusted so all the reverts can be detect by using it. async fn store_base_layer_block( &mut self, block_number: BlockNumber, @@ -645,8 +640,6 @@ impl< ) -> StateSyncResult { self.perform_storage_writes(move |writer| { let txn = writer.begin_rw_txn()?; - // Missing header can be because of a base layer reorg, the matching header may be - // reverted. let expected_hash = txn .get_block_header(block_number)? .ok_or(StateSyncError::BaseLayerBlockWithoutMatchingHeader { block_number })? @@ -692,11 +685,11 @@ impl< .block_hash; if prev_hash != block.header.block_header_without_hash.parent_hash { - // A revert detected, log and restart sync loop. + // Block parent hash mismatch, log and restart sync loop. warn!( - "Detected revert while processing block {}. Parent hash of the incoming block is \ - {}, current block hash is {}.", - block_number, block.header.block_header_without_hash.parent_hash, prev_hash + "Detected reorg in central. block number {block_number} had hash {prev_hash} but \ + the next block's parent hash is {}.", + block.header.block_header_without_hash.parent_hash ); CENTRAL_SYNC_FORKS_FROM_FEEDER.increment(1); return Err(StateSyncError::ParentBlockHashMismatch { @@ -709,73 +702,6 @@ impl< Ok(()) } - // Reverts data if needed. - async fn handle_block_reverts(&mut self) -> Result<(), StateSyncError> { - debug!("Handling block reverts."); - let header_marker = self.reader.begin_ro_txn()?.get_header_marker()?; - - // Revert last blocks if needed. - let mut last_block_in_storage = header_marker.prev(); - while let Some(block_number) = last_block_in_storage { - if self.should_revert_block(block_number).await? { - self.revert_block(block_number).await?; - last_block_in_storage = block_number.prev(); - } else { - break; - } - } - Ok(()) - } - - // TODO(dan): update necessary metrics. - // Deletes the block data from the storage. - #[allow(clippy::expect_fun_call)] - #[instrument(skip(self), level = "debug", err)] - async fn revert_block(&mut self, block_number: BlockNumber) -> StateSyncResult { - debug!("Reverting block."); - - self.perform_storage_writes(move |writer| { - let mut txn = writer.begin_rw_txn()?; - txn = txn.try_revert_base_layer_marker(block_number)?; - let res = txn.revert_header(block_number)?; - txn = res.0; - let mut reverted_block_hash: Option = None; - if let Some(header) = res.1 { - reverted_block_hash = Some(header.block_hash); - - let res = txn.revert_body(block_number)?; - txn = res.0; - - let res = txn.revert_state_diff(block_number)?; - txn = res.0; - } - - txn.commit()?; - if let Some(hash) = reverted_block_hash { - info!(%hash, %block_number, "Reverted block."); - } - Ok(()) - }) - .await - } - - /// Checks if centrals block hash at the block number is different from ours (or doesn't exist). - /// If so, a revert is required. - async fn should_revert_block(&self, block_number: BlockNumber) -> Result { - if let Some(central_block_hash) = self.central_source.get_block_hash(block_number).await? { - let storage_block_header = - self.reader.begin_ro_txn()?.get_block_header(block_number)?; - - match storage_block_header { - Some(block_header) => Ok(block_header.block_hash != central_block_hash), - None => Ok(false), - } - } else { - // Block number doesn't exist in central, revert. - Ok(true) - } - } - async fn perform_storage_writes< F: FnOnce(&mut StorageWriter) -> Result<(), StateSyncError> + Send + 'static, >( diff --git a/crates/apollo_central_sync/src/sources/central_sync_test.rs b/crates/apollo_central_sync/src/sources/central_sync_test.rs index 6f63eba4cc1..54a47101ec5 100644 --- a/crates/apollo_central_sync/src/sources/central_sync_test.rs +++ b/crates/apollo_central_sync/src/sources/central_sync_test.rs @@ -12,11 +12,10 @@ use apollo_storage::{StorageError, StorageReader, StorageWriter}; use apollo_test_utils::{get_rng, GetTestInstance}; use assert_matches::assert_matches; use async_stream::stream; -use async_trait::async_trait; use cairo_lang_starknet_classes::casm_contract_class::CasmContractClass; use futures::StreamExt; use indexmap::IndexMap; -use papyrus_common::pending_classes::{ApiContractClass, PendingClasses}; +use papyrus_common::pending_classes::PendingClasses; use starknet_api::block::{ Block, BlockBody, @@ -488,315 +487,6 @@ async fn sync_happy_flow() { } } -#[tokio::test] -async fn sync_with_revert() { - let _ = simple_logger::init_with_env(); - let ((reader, writer), _temp_dir) = get_test_storage(); - - // Once the sync reaches N_BLOCKS_BEFORE_REVERT, the check_storage thread will set this flag to - // true to mark the central to simulate a revert, and for the check_storage to start checking - // for the new blocks after the revert. - let reverted_mutex = Arc::new(Mutex::new(false)); - - // Prepare sync thread with mocked central source that will perform a revert once the - // reverted_mutex is true. - let mock = MockedCentralWithRevert { reverted: reverted_mutex.clone() }; - let mut base_layer_mock = MockBaseLayerSourceTrait::new(); - base_layer_mock.expect_latest_proved_block().returning(|| Ok(None)); - let class_manager_client = None; - let sync_future = run_sync( - reader.clone(), - writer, - mock, - base_layer_mock, - get_test_sync_config(false), - class_manager_client, - ); - - // Prepare functions that check that the sync worked up to N_BLOCKS_BEFORE_REVERT and then - // reacted correctly to the revert. - const N_BLOCKS_BEFORE_REVERT: u64 = 8; - // FIXME: (Shahak) analyze and set a lower value. - const MAX_TIME_TO_SYNC_BEFORE_REVERT_MS: u64 = 900; - const CHAIN_FORK_BLOCK_NUMBER: u64 = 5; - const N_BLOCKS_AFTER_REVERT: u64 = 10; - // FIXME: (Omri) analyze and set a lower value. - const MAX_TIME_TO_SYNC_AFTER_REVERT_MS: u64 = 900; - - // Part 1 - check that the storage reached the point at which we will make the revert. - let check_storage_before_revert_future = check_storage( - reader.clone(), - Duration::from_millis(MAX_TIME_TO_SYNC_BEFORE_REVERT_MS), - |reader| { - let marker = reader.begin_ro_txn().unwrap().get_header_marker().unwrap(); - debug!("Before revert, block marker currently at {}", marker); - match marker { - BlockNumber(bn) if bn < N_BLOCKS_BEFORE_REVERT => { - CheckStoragePredicateResult::InProgress - } - BlockNumber(bn) if bn == N_BLOCKS_BEFORE_REVERT => { - CheckStoragePredicateResult::Passed - } - _ => CheckStoragePredicateResult::Error, - } - }, - ); - - // Part 2 - signal the mocked central to simulate a revert. - let signal_revert = async { - debug!("Reverting."); - let mut reverted = reverted_mutex.lock().await; - *reverted = true; - }; - - // Part 3 - check that the storage reverted correctly. - let check_storage_after_revert_future = check_storage( - reader.clone(), - Duration::from_millis(MAX_TIME_TO_SYNC_AFTER_REVERT_MS), - |reader| { - let block_marker = reader.begin_ro_txn().unwrap().get_header_marker().unwrap(); - let state_marker = reader.begin_ro_txn().unwrap().get_state_marker().unwrap(); - debug!( - "Block marker currently at {}, state marker currently at {}.", - block_marker, state_marker - ); - - // We can't check the storage data until the marker reaches N_BLOCKS_AFTER_REVERT - // because we can't know if the revert was already detected in the sync or not. - // Check both markers. - match (block_marker, state_marker) { - (BlockNumber(bm), BlockNumber(sm)) - if bm > N_BLOCKS_AFTER_REVERT || sm > N_BLOCKS_AFTER_REVERT => - { - CheckStoragePredicateResult::Error - } - - (BlockNumber(bm), BlockNumber(sm)) - if bm < N_BLOCKS_AFTER_REVERT || sm < N_BLOCKS_AFTER_REVERT => - { - CheckStoragePredicateResult::InProgress - } - (BlockNumber(bm), BlockNumber(sm)) - if bm == N_BLOCKS_AFTER_REVERT && sm == N_BLOCKS_AFTER_REVERT => - { - // Both blocks and state updates are fully synced, check the data validity. - for bn in BlockNumber(CHAIN_FORK_BLOCK_NUMBER) - .iter_up_to(BlockNumber(N_BLOCKS_AFTER_REVERT)) - { - debug!("checking hash for block {}", bn); - let block_header = - reader.begin_ro_txn().unwrap().get_block_header(bn).unwrap(); - - if block_header.is_none() { - error!("Block {} doesn't exist", bn); - return CheckStoragePredicateResult::Error; - } - let block_hash = block_header.unwrap().block_hash; - let expected_block_hash = create_block_hash(bn, true); - if block_hash != expected_block_hash { - error!( - "Wrong hash for block {}. Got {}, Expected {}.", - bn, block_hash, expected_block_hash - ); - return CheckStoragePredicateResult::Error; - } - - // TODO(Yair): add checks to the state diff. - } - - CheckStoragePredicateResult::Passed - } - _ => unreachable!("Should never happen."), - } - }, - ); - - // Assemble the pieces for the revert flow test. - let check_flow = async { - assert!(check_storage_before_revert_future.await); - signal_revert.await; - assert!(check_storage_after_revert_future.await); - }; - - tokio::select! { - sync_result = sync_future => sync_result.unwrap(), - _ = check_flow => {}, - } - - // Mock central source that performs a revert once the reverted mutex is set to true. - struct MockedCentralWithRevert { - reverted: Arc>, - } - impl MockedCentralWithRevert { - fn revert_happend(&self) -> bool { - match self.reverted.try_lock() { - Ok(reverted) => *reverted, - _ => false, - } - } - } - - #[async_trait] - impl CentralSourceTrait for MockedCentralWithRevert { - async fn get_latest_block(&self) -> Result, CentralError> { - let already_reverted = self.revert_happend(); - let block_number = match already_reverted { - false => BlockNumber(N_BLOCKS_BEFORE_REVERT), - true => BlockNumber(N_BLOCKS_AFTER_REVERT), - } - .prev() - .unwrap(); - Ok(Some(BlockHashAndNumber { - hash: create_block_hash(block_number, already_reverted), - number: block_number, - })) - } - - async fn get_block_hash( - &self, - block_number: BlockNumber, - ) -> Result, CentralError> { - match (self.revert_happend(), block_number) { - (false, BlockNumber(bn)) if bn >= N_BLOCKS_BEFORE_REVERT => Ok(None), - (false, BlockNumber(bn)) if bn < N_BLOCKS_BEFORE_REVERT => { - Ok(Some(create_block_hash(block_number, false))) - } - (true, BlockNumber(bn)) if bn >= N_BLOCKS_AFTER_REVERT => Ok(None), - (true, BlockNumber(bn)) if bn >= CHAIN_FORK_BLOCK_NUMBER => { - Ok(Some(create_block_hash(block_number, true))) - } - (true, BlockNumber(bn)) if bn < CHAIN_FORK_BLOCK_NUMBER => { - Ok(Some(create_block_hash(block_number, false))) - } - _ => unreachable!( - "get_block_hash when Revert happend: {}, bn: {}", - self.revert_happend(), - block_number - ), - } - } - - fn stream_new_blocks( - &self, - initial_block_number: BlockNumber, - up_to_block_number: BlockNumber, - ) -> BlocksStream<'_> { - match self.revert_happend() { - false => stream! { - for i in initial_block_number.iter_up_to(up_to_block_number) { - if i.0 >= N_BLOCKS_BEFORE_REVERT { - yield Err(CentralError::BlockNotFound { block_number: i }); - } - let header = BlockHeader{ - block_hash: create_block_hash(i, false), - block_header_without_hash: BlockHeaderWithoutHash { - block_number: i, - parent_hash: create_block_hash(i.prev().unwrap_or_default(), false), - ..Default::default() - }, - ..Default::default() - }; - yield Ok(( - i, - Block{ header, body: BlockBody::default() }, - BlockSignature::default(), - )); - } - } - .boxed(), - true => stream! { - for i in initial_block_number.iter_up_to(up_to_block_number) { - if i.0 >= N_BLOCKS_AFTER_REVERT { - yield Err(CentralError::BlockNotFound { block_number: i }); - } - let header = BlockHeader { - block_hash: create_block_hash(i, i.0 >= CHAIN_FORK_BLOCK_NUMBER), - block_header_without_hash: BlockHeaderWithoutHash { - block_number: i, - parent_hash: create_block_hash(i.prev().unwrap_or_default(), i.0 > CHAIN_FORK_BLOCK_NUMBER), - ..Default::default() - }, - ..Default::default() - }; - yield Ok(( - i, - Block{header, body: BlockBody::default()}, - BlockSignature::default(), - )); - } - } - .boxed() - } - } - - fn stream_state_updates( - &self, - initial_block_number: BlockNumber, - up_to_block_number: BlockNumber, - ) -> StateUpdatesStream<'_> { - match self.revert_happend() { - false => stream! { - for i in initial_block_number.iter_up_to(up_to_block_number) { - if i.0 >= N_BLOCKS_BEFORE_REVERT { - yield Err(CentralError::BlockNotFound { block_number: i }); - } - yield Ok((i, create_block_hash(i, false), StateDiff::default(), IndexMap::new())); - } - } - .boxed(), - true => stream! { - for i in initial_block_number.iter_up_to(up_to_block_number) { - if i.0 >= N_BLOCKS_AFTER_REVERT { - yield Err(CentralError::BlockNotFound { block_number: i }); - } - let is_reverted_state_diff = i.0 >= CHAIN_FORK_BLOCK_NUMBER; - yield Ok(( - i, - create_block_hash(i, is_reverted_state_diff), - StateDiff::default(), - IndexMap::new(), - )); - } - } - .boxed(), - } - } - - fn stream_compiled_classes( - &self, - _initial_block_number: BlockNumber, - _up_to_block_number: BlockNumber, - ) -> CompiledClassesStream<'_> { - // An empty stream. - let res: CompiledClassesStream<'_> = stream! { - for i in [] { - yield i; - } - } - .boxed(); - res - } - - async fn get_class( - &self, - _class_hash: ClassHash, - ) -> Result { - unimplemented!(); - } - - async fn get_compiled_class( - &self, - _class_hash: ClassHash, - ) -> Result { - unimplemented!(); - } - - async fn get_sequencer_pub_key(&self) -> Result { - unimplemented!() - } - } -} - #[tokio::test] async fn test_unrecoverable_sync_error_flow() { let _ = simple_logger::init_with_env();