Skip to content

Commit eb33c24

Browse files
apollo_state_sync: add storage_reader_server to the state sync
1 parent dfed5a3 commit eb33c24

3 files changed

Lines changed: 55 additions & 13 deletions

File tree

crates/apollo_state_sync/src/lib.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,18 @@ use apollo_starknet_client::reader::{StarknetFeederGatewayClient, StarknetReader
1212
use apollo_state_sync_config::config::StateSyncConfig;
1313
use apollo_state_sync_types::communication::{StateSyncRequest, StateSyncResponse};
1414
use apollo_state_sync_types::errors::StateSyncError;
15-
use apollo_state_sync_types::state_sync_types::{StateSyncResult, SyncBlock};
15+
use apollo_state_sync_types::state_sync_types::{
16+
StateSyncResult,
17+
StateSyncStorageReaderServerHandler,
18+
StateSyncStorageRequest,
19+
StateSyncStorageResponse,
20+
SyncBlock,
21+
};
1622
use apollo_storage::body::BodyStorageReader;
1723
use apollo_storage::db::TransactionKind;
1824
use apollo_storage::header::HeaderStorageReader;
1925
use apollo_storage::state::{StateReader, StateStorageReader};
26+
use apollo_storage::storage_reader_server::StorageReaderServer;
2027
use apollo_storage::{StorageReader, StorageTxn};
2128
use async_trait::async_trait;
2229
use futures::channel::mpsc::{channel, Sender};
@@ -30,6 +37,11 @@ use starknet_types_core::felt::Felt;
3037
use crate::runner::StateSyncRunner;
3138

3239
const BUFFER_SIZE: usize = 100000;
40+
type StateSyncStorageReaderServer = StorageReaderServer<
41+
StateSyncStorageReaderServerHandler,
42+
StateSyncStorageRequest,
43+
StateSyncStorageResponse,
44+
>;
3345

3446
pub fn create_state_sync_and_runner(
3547
config: StateSyncConfig,

crates/apollo_state_sync/src/runner/mod.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,13 @@ use apollo_state_sync_types::state_sync_types::SyncBlock;
4141
use apollo_storage::body::BodyStorageReader;
4242
use apollo_storage::header::HeaderStorageReader;
4343
use apollo_storage::metrics::SYNC_STORAGE_OPEN_READ_TRANSACTIONS;
44-
use apollo_storage::{open_storage_with_metric, StorageConfig, StorageReader, StorageWriter};
44+
use apollo_storage::storage_reader_server::ServerConfig;
45+
use apollo_storage::{
46+
open_storage_with_metric_and_server,
47+
StorageConfig,
48+
StorageReader,
49+
StorageWriter,
50+
};
4551
use async_trait::async_trait;
4652
use futures::channel::mpsc::Receiver;
4753
use futures::future::{self, pending, BoxFuture};
@@ -54,6 +60,8 @@ use tokio::sync::RwLock;
5460
use tracing::instrument::Instrument;
5561
use tracing::{debug, info_span};
5662

63+
use crate::StateSyncStorageReaderServer;
64+
5765
pub struct StateSyncRunner {
5866
network_future: BoxFuture<'static, Result<(), NetworkError>>,
5967
// TODO(Matan): change client and server to requester and responder respectively
@@ -63,6 +71,7 @@ pub struct StateSyncRunner {
6371
new_block_dev_null_future: BoxFuture<'static, Never>,
6472
rpc_server_future: BoxFuture<'static, ()>,
6573
register_metrics_future: BoxFuture<'static, ()>,
74+
pub storage_reader_server: Option<StateSyncStorageReaderServer>,
6675
}
6776

6877
#[async_trait]
@@ -98,13 +107,18 @@ pub struct StateSyncResources {
98107
pub shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
99108
pub pending_data: Arc<RwLock<PendingData>>,
100109
pub pending_classes: Arc<RwLock<PendingClasses>>,
110+
pub storage_reader_server: Option<StateSyncStorageReaderServer>,
101111
}
102112

103113
impl StateSyncResources {
104-
pub fn new(storage_config: &StorageConfig) -> Self {
105-
let (storage_reader, storage_writer) =
106-
open_storage_with_metric(storage_config.clone(), &SYNC_STORAGE_OPEN_READ_TRANSACTIONS)
107-
.expect("StateSyncRunner failed opening storage");
114+
pub fn new(storage_config: &StorageConfig, storage_reader_server_config: ServerConfig) -> Self {
115+
let (storage_reader, storage_writer, storage_reader_server) =
116+
open_storage_with_metric_and_server(
117+
storage_config.clone(),
118+
&SYNC_STORAGE_OPEN_READ_TRANSACTIONS,
119+
storage_reader_server_config,
120+
)
121+
.expect("StateSyncRunner failed opening storage");
108122
let shared_highest_block = Arc::new(RwLock::new(None));
109123
let pending_data = Arc::new(RwLock::new(PendingData {
110124
// The pending data might change later to DeprecatedPendingBlock, depending on the
@@ -116,7 +130,14 @@ impl StateSyncResources {
116130
..Default::default()
117131
}));
118132
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));
119-
Self { storage_reader, storage_writer, shared_highest_block, pending_data, pending_classes }
133+
Self {
134+
storage_reader,
135+
storage_writer,
136+
shared_highest_block,
137+
pending_data,
138+
pending_classes,
139+
storage_reader_server,
140+
}
120141
}
121142
}
122143

@@ -133,7 +154,7 @@ impl StateSyncRunner {
133154
network_config,
134155
revert_config,
135156
rpc_config,
136-
storage_reader_server_config: _,
157+
storage_reader_server_config,
137158
} = config;
138159

139160
let StateSyncResources {
@@ -142,7 +163,8 @@ impl StateSyncRunner {
142163
shared_highest_block,
143164
pending_data,
144165
pending_classes,
145-
} = StateSyncResources::new(&storage_config);
166+
storage_reader_server,
167+
} = StateSyncResources::new(&storage_config, storage_reader_server_config);
146168

147169
let register_metrics_future = register_metrics(storage_reader.clone()).boxed();
148170

@@ -203,6 +225,7 @@ impl StateSyncRunner {
203225
new_block_dev_null_future: pending().boxed(),
204226
rpc_server_future,
205227
register_metrics_future,
228+
storage_reader_server,
206229
},
207230
storage_reader,
208231
);
@@ -313,6 +336,7 @@ impl StateSyncRunner {
313336
new_block_dev_null_future,
314337
rpc_server_future,
315338
register_metrics_future,
339+
storage_reader_server,
316340
},
317341
storage_reader,
318342
)

crates/apollo_state_sync/src/test.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ use crate::StateSync;
2424

2525
fn setup() -> (StateSync, StorageWriter) {
2626
let ((storage_reader, storage_writer), _) = get_test_storage();
27-
let state_sync =
28-
StateSync { storage_reader, new_block_sender: channel(0).0, starknet_client: None };
27+
let state_sync = StateSync {
28+
storage_reader,
29+
new_block_sender: channel(0).0,
30+
starknet_client: None,
31+
};
2932
(state_sync, storage_writer)
3033
}
3134

@@ -125,8 +128,11 @@ async fn test_get_block_hash_fallback_to_starknet_client() {
125128
let starknet_client: Option<Arc<dyn StarknetReader + Send + Sync>> =
126129
Some(Arc::new(starknet_client));
127130
let ((storage_reader, _storage_writer), _) = get_test_storage();
128-
let mut state_sync =
129-
StateSync { storage_reader, new_block_sender: channel(0).0, starknet_client };
131+
let mut state_sync = StateSync {
132+
storage_reader,
133+
new_block_sender: channel(0).0,
134+
starknet_client,
135+
};
130136

131137
// The block is not in storage, so it should fall back to starknet_client
132138
let response = state_sync.handle_request(StateSyncRequest::GetBlockHash(block_number)).await;

0 commit comments

Comments
 (0)