Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions crates/apollo_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::sync::Arc;
use apollo_batcher_config::config::BatcherConfig;
use apollo_batcher_types::batcher_types::{
BatcherResult,
BatcherStorageReaderServerHandler,
BatcherStorageRequest,
BatcherStorageResponse,
CentralObjects,
DecisionReachedInput,
DecisionReachedResponse,
Expand Down Expand Up @@ -35,6 +38,7 @@ use apollo_reverts::revert_block;
use apollo_state_sync_types::state_sync_types::SyncBlock;
use apollo_storage::metrics::BATCHER_STORAGE_OPEN_READ_TRANSACTIONS;
use apollo_storage::state::{StateStorageReader, StateStorageWriter};
use apollo_storage::storage_reader_server::{create_storage_reader_server, StorageReaderServer};
use apollo_storage::{open_storage_with_metric, StorageReader, StorageResult, StorageWriter};
use async_trait::async_trait;
use blockifier::concurrency::worker_pool::WorkerPool;
Expand Down Expand Up @@ -145,6 +149,14 @@ pub struct Batcher {
/// The proposal commitment of the previous height.
/// This is returned by the decision_reached function.
prev_proposal_commitment: Option<(BlockNumber, ProposalCommitment)>,
/// Optional HTTP server to expose storage queries remotely.
storage_reader_server: Option<
StorageReaderServer<
BatcherStorageReaderServerHandler,
BatcherStorageRequest,
BatcherStorageResponse,
>,
>,
}

impl Batcher {
Expand All @@ -158,6 +170,13 @@ impl Batcher {
transaction_converter: TransactionConverter,
block_builder_factory: Box<dyn BlockBuilderFactoryTrait>,
pre_confirmed_block_writer_factory: Box<dyn PreconfirmedBlockWriterFactoryTrait>,
storage_reader_server: Option<
StorageReaderServer<
BatcherStorageReaderServerHandler,
BatcherStorageRequest,
BatcherStorageResponse,
>,
>,
) -> Self {
Self {
config,
Expand All @@ -177,6 +196,7 @@ impl Batcher {
// Allow the first few proposals to be without L1 txs while system starts up.
proposals_counter: 1,
prev_proposal_commitment: None,
storage_reader_server,
}
}

Expand Down Expand Up @@ -1024,6 +1044,16 @@ pub fn create_batcher(
open_storage_with_metric(config.storage.clone(), &BATCHER_STORAGE_OPEN_READ_TRANSACTIONS)
.expect("Failed to open batcher's storage");

let storage_reader_server = create_storage_reader_server::<
BatcherStorageReaderServerHandler,
BatcherStorageRequest,
BatcherStorageResponse,
>(
storage_reader.clone(),
config.storage_reader_server_config.socket,
config.storage_reader_server_config.enable,
);

let execute_config = &config.block_builder_config.execute_config;
let worker_pool = Arc::new(WorkerPool::start(execute_config));
let pre_confirmed_block_writer_factory = Box::new(PreconfirmedBlockWriterFactory {
Expand Down Expand Up @@ -1053,6 +1083,7 @@ pub fn create_batcher(
transaction_converter,
block_builder_factory,
pre_confirmed_block_writer_factory,
storage_reader_server,
)
}

Expand Down Expand Up @@ -1108,6 +1139,17 @@ impl BatcherStorageWriter for StorageWriter {
impl ComponentStarter for Batcher {
async fn start(&mut self) {
default_component_start_fn::<Self>().await;

// Start the storage reader server if configured
if let Some(server) = self.storage_reader_server.take() {
tokio::spawn(async move {
if let Err(e) = server.run().await {
error!("Batcher storage reader server error: {}", e);
}
});
info!("Batcher storage reader server started");
}

let storage_height = self
.storage_reader
.height()
Expand Down
7 changes: 7 additions & 0 deletions crates/apollo_batcher_config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use apollo_config::converters::deserialize_milliseconds_to_duration;
use apollo_config::dumping::{prepend_sub_config_name, ser_param, SerializeConfig};
use apollo_config::secrets::Sensitive;
use apollo_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use apollo_storage::storage_reader_server::ServerConfig as StorageReaderServerConfig;
use blockifier::blockifier::config::{ContractClassManagerConfig, WorkerPoolConfig};
use blockifier::blockifier_versioned_constants::VersionedConstantsOverrides;
use blockifier::bouncer::BouncerConfig;
Expand Down Expand Up @@ -153,6 +154,7 @@ pub struct BatcherConfig {
pub max_l1_handler_txs_per_block_proposal: usize,
pub pre_confirmed_cende_config: PreconfirmedCendeConfig,
pub propose_l1_txs_every: u64,
pub storage_reader_server_config: StorageReaderServerConfig,
}

impl SerializeConfig for BatcherConfig {
Expand Down Expand Up @@ -186,6 +188,10 @@ impl SerializeConfig for BatcherConfig {
),
]);
dump.append(&mut prepend_sub_config_name(self.storage.dump(), "storage"));
dump.append(&mut prepend_sub_config_name(
self.storage_reader_server_config.dump(),
"storage_reader_server",
));
dump.append(&mut prepend_sub_config_name(
self.block_builder_config.dump(),
"block_builder_config",
Expand Down Expand Up @@ -227,6 +233,7 @@ impl Default for BatcherConfig {
max_l1_handler_txs_per_block_proposal: 3,
pre_confirmed_cende_config: PreconfirmedCendeConfig::default(),
propose_l1_txs_every: 1, // Default is to propose L1 transactions every proposal.
storage_reader_server_config: StorageReaderServerConfig::default(),
}
}
}
Expand Down
Loading