diff --git a/crates/apollo_state_sync/src/lib.rs b/crates/apollo_state_sync/src/lib.rs index ea43643fbf2..7de231b0e20 100644 --- a/crates/apollo_state_sync/src/lib.rs +++ b/crates/apollo_state_sync/src/lib.rs @@ -12,11 +12,18 @@ use apollo_starknet_client::reader::{StarknetFeederGatewayClient, StarknetReader use apollo_state_sync_config::config::StateSyncConfig; use apollo_state_sync_types::communication::{StateSyncRequest, StateSyncResponse}; use apollo_state_sync_types::errors::StateSyncError; -use apollo_state_sync_types::state_sync_types::{StateSyncResult, SyncBlock}; +use apollo_state_sync_types::state_sync_types::{ + StateSyncResult, + StateSyncStorageReaderServerHandler, + StateSyncStorageRequest, + StateSyncStorageResponse, + SyncBlock, +}; use apollo_storage::body::BodyStorageReader; use apollo_storage::db::TransactionKind; use apollo_storage::header::HeaderStorageReader; use apollo_storage::state::{StateReader, StateStorageReader}; +use apollo_storage::storage_reader_server::StorageReaderServer; use apollo_storage::{StorageReader, StorageTxn}; use async_trait::async_trait; use futures::channel::mpsc::{channel, Sender}; @@ -30,6 +37,11 @@ use starknet_types_core::felt::Felt; use crate::runner::StateSyncRunner; const BUFFER_SIZE: usize = 100000; +type StateSyncStorageReaderServer = StorageReaderServer< + StateSyncStorageReaderServerHandler, + StateSyncStorageRequest, + StateSyncStorageResponse, +>; pub fn create_state_sync_and_runner( config: StateSyncConfig, diff --git a/crates/apollo_state_sync/src/runner/mod.rs b/crates/apollo_state_sync/src/runner/mod.rs index bf4eb80ac04..4241e1bea0b 100644 --- a/crates/apollo_state_sync/src/runner/mod.rs +++ b/crates/apollo_state_sync/src/runner/mod.rs @@ -41,7 +41,13 @@ use apollo_state_sync_types::state_sync_types::SyncBlock; use apollo_storage::body::BodyStorageReader; use apollo_storage::header::HeaderStorageReader; use apollo_storage::metrics::SYNC_STORAGE_OPEN_READ_TRANSACTIONS; -use apollo_storage::{open_storage_with_metric, StorageConfig, StorageReader, StorageWriter}; +use apollo_storage::storage_reader_server::ServerConfig; +use apollo_storage::{ + open_storage_with_metric_and_server, + StorageConfig, + StorageReader, + StorageWriter, +}; use async_trait::async_trait; use futures::channel::mpsc::Receiver; use futures::future::{self, pending, BoxFuture}; @@ -54,6 +60,8 @@ use tokio::sync::RwLock; use tracing::instrument::Instrument; use tracing::{debug, info_span}; +use crate::StateSyncStorageReaderServer; + pub struct StateSyncRunner { network_future: BoxFuture<'static, Result<(), NetworkError>>, // TODO(Matan): change client and server to requester and responder respectively @@ -63,6 +71,7 @@ pub struct StateSyncRunner { new_block_dev_null_future: BoxFuture<'static, Never>, rpc_server_future: BoxFuture<'static, ()>, register_metrics_future: BoxFuture<'static, ()>, + pub storage_reader_server: Option, } #[async_trait] @@ -98,13 +107,18 @@ pub struct StateSyncResources { pub shared_highest_block: Arc>>, pub pending_data: Arc>, pub pending_classes: Arc>, + pub storage_reader_server: Option, } impl StateSyncResources { - pub fn new(storage_config: &StorageConfig) -> Self { - let (storage_reader, storage_writer) = - open_storage_with_metric(storage_config.clone(), &SYNC_STORAGE_OPEN_READ_TRANSACTIONS) - .expect("StateSyncRunner failed opening storage"); + pub fn new(storage_config: &StorageConfig, storage_reader_server_config: ServerConfig) -> Self { + let (storage_reader, storage_writer, storage_reader_server) = + open_storage_with_metric_and_server( + storage_config.clone(), + &SYNC_STORAGE_OPEN_READ_TRANSACTIONS, + storage_reader_server_config, + ) + .expect("StateSyncRunner failed opening storage"); let shared_highest_block = Arc::new(RwLock::new(None)); let pending_data = Arc::new(RwLock::new(PendingData { // The pending data might change later to DeprecatedPendingBlock, depending on the @@ -116,7 +130,14 @@ impl StateSyncResources { ..Default::default() })); let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); - Self { storage_reader, storage_writer, shared_highest_block, pending_data, pending_classes } + Self { + storage_reader, + storage_writer, + shared_highest_block, + pending_data, + pending_classes, + storage_reader_server, + } } } @@ -133,7 +154,7 @@ impl StateSyncRunner { network_config, revert_config, rpc_config, - storage_reader_server_config: _, + storage_reader_server_config, } = config; let StateSyncResources { @@ -142,7 +163,8 @@ impl StateSyncRunner { shared_highest_block, pending_data, pending_classes, - } = StateSyncResources::new(&storage_config); + storage_reader_server, + } = StateSyncResources::new(&storage_config, storage_reader_server_config); let register_metrics_future = register_metrics(storage_reader.clone()).boxed(); @@ -203,6 +225,7 @@ impl StateSyncRunner { new_block_dev_null_future: pending().boxed(), rpc_server_future, register_metrics_future, + storage_reader_server, }, storage_reader, ); @@ -313,6 +336,7 @@ impl StateSyncRunner { new_block_dev_null_future, rpc_server_future, register_metrics_future, + storage_reader_server, }, storage_reader, ) diff --git a/crates/apollo_state_sync/src/runner/test.rs b/crates/apollo_state_sync/src/runner/test.rs index 95bd928e210..4bf5ac6218a 100644 --- a/crates/apollo_state_sync/src/runner/test.rs +++ b/crates/apollo_state_sync/src/runner/test.rs @@ -24,6 +24,7 @@ fn run_panics_when_network_future_returns() { new_block_dev_null_future, rpc_server_future, register_metrics_future, + storage_reader_server: None, }; state_sync_runner.start().now_or_never().unwrap(); } @@ -47,6 +48,7 @@ fn run_panics_when_network_future_returns_error() { new_block_dev_null_future, rpc_server_future, register_metrics_future, + storage_reader_server: None, }; state_sync_runner.start().now_or_never().unwrap(); } @@ -69,6 +71,7 @@ fn run_panics_when_sync_client_future_returns_error() { new_block_dev_null_future, rpc_server_future, register_metrics_future, + storage_reader_server: None, }; state_sync_runner.start().now_or_never().unwrap(); }