Skip to content

Commit 16492e8

Browse files
apollo_gateway: remove processing tx block
1 parent 104a7e7 commit 16492e8

File tree

8 files changed

+157
-134
lines changed

8 files changed

+157
-134
lines changed

crates/apollo_gateway/src/gateway.rs

Lines changed: 28 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,13 @@ use apollo_proc_macros::sequencer_latency_histogram;
2626
use apollo_state_sync_types::communication::SharedStateSyncClient;
2727
use axum::async_trait;
2828
use blockifier::state::contract_class_manager::ContractClassManager;
29-
use starknet_api::core::Nonce;
30-
use starknet_api::executable_transaction::AccountTransaction;
3129
use starknet_api::rpc_transaction::{
3230
InternalRpcTransaction,
3331
InternalRpcTransactionWithoutTxHash,
3432
RpcDeclareTransaction,
3533
RpcTransaction,
3634
};
37-
use tracing::{debug, info, warn, Span};
35+
use tracing::{debug, warn, Span};
3836

3937
use crate::errors::{
4038
mempool_client_result_to_deprecated_gw_result,
@@ -153,33 +151,34 @@ impl Gateway {
153151
transaction_converter_err_to_deprecated_gw_err(&tx_signature, e)
154152
})?;
155153

156-
let blocking_task =
157-
ProcessTxBlockingTask::new(self, executable_tx, tokio::runtime::Handle::current());
158-
// Run the blocking task in the current span.
154+
let mut stateful_transaction_validator = self
155+
.stateful_tx_validator_factory
156+
.instantiate_validator(self.state_reader_factory.clone())
157+
.await
158+
.inspect_err(|e| metric_counters.record_add_tx_failure(e))?;
159+
159160
let curr_span = Span::current();
160-
let handle =
161-
tokio::task::spawn_blocking(move || curr_span.in_scope(|| blocking_task.process_tx()));
162-
let handle_result = handle.await;
163-
let nonce = match handle_result {
164-
Ok(Ok(nonce)) => nonce,
165-
Ok(Err(starknet_err)) => {
166-
info!(
167-
"Gateway validation failed for tx with signature: {:?} with error: {}",
168-
&tx_signature, starknet_err
169-
);
170-
metric_counters.record_add_tx_failure(&starknet_err);
171-
return Err(starknet_err);
172-
}
173-
Err(join_err) => {
174-
let err = StarknetError::internal_with_signature_logging(
175-
"Failed to process tx",
176-
&tx_signature,
177-
join_err,
178-
);
179-
metric_counters.record_add_tx_failure(&err);
180-
return Err(err);
181-
}
182-
};
161+
let mempool_client = self.mempool_client.clone();
162+
let nonce = tokio::task::spawn_blocking(move || {
163+
curr_span.in_scope(|| {
164+
stateful_transaction_validator.extract_state_nonce_and_run_validations(
165+
&executable_tx.clone(),
166+
mempool_client,
167+
tokio::runtime::Handle::current(),
168+
)
169+
})
170+
})
171+
.await
172+
.map_err(|e| {
173+
let err = StarknetError {
174+
code: StarknetErrorCode::UnknownErrorCode(
175+
"StarknetErrorCode.InternalError".to_string(),
176+
),
177+
message: format!("Validation task failed to complete: {e}"),
178+
};
179+
metric_counters.record_add_tx_failure(&err);
180+
err
181+
})??;
183182

184183
let gateway_output = create_gateway_output(&internal_tx);
185184

@@ -230,46 +229,6 @@ impl Gateway {
230229
}
231230
}
232231

233-
/// CPU-intensive transaction processing, spawned in a blocking thread to avoid blocking other tasks
234-
/// from running.
235-
struct ProcessTxBlockingTask {
236-
stateful_tx_validator_factory: Arc<dyn StatefulTransactionValidatorFactoryTrait>,
237-
state_reader_factory: Arc<dyn StateReaderFactory>,
238-
mempool_client: SharedMempoolClient,
239-
executable_tx: AccountTransaction,
240-
runtime: tokio::runtime::Handle,
241-
}
242-
243-
impl ProcessTxBlockingTask {
244-
pub fn new(
245-
gateway: &Gateway,
246-
executable_tx: AccountTransaction,
247-
runtime: tokio::runtime::Handle,
248-
) -> Self {
249-
Self {
250-
stateful_tx_validator_factory: gateway.stateful_tx_validator_factory.clone(),
251-
state_reader_factory: gateway.state_reader_factory.clone(),
252-
mempool_client: gateway.mempool_client.clone(),
253-
executable_tx,
254-
runtime,
255-
}
256-
}
257-
258-
fn process_tx(self) -> GatewayResult<Nonce> {
259-
let mut stateful_transaction_validator = self.runtime.block_on(
260-
self.stateful_tx_validator_factory.instantiate_validator(self.state_reader_factory),
261-
)?;
262-
263-
let nonce = stateful_transaction_validator.extract_state_nonce_and_run_validations(
264-
&self.executable_tx,
265-
self.mempool_client,
266-
self.runtime,
267-
)?;
268-
269-
Ok(nonce)
270-
}
271-
}
272-
273232
pub fn create_gateway(
274233
config: GatewayConfig,
275234
shared_state_sync_client: SharedStateSyncClient,

crates/apollo_gateway/src/gateway_fixed_block_state_reader.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
use std::sync::Arc;
22

33
use apollo_gateway_types::deprecated_gateway_error::StarknetError;
4-
use apollo_state_sync_types::communication::{SharedStateSyncClient, StateSyncClient};
4+
use apollo_state_sync_types::communication::{
5+
SharedStateSyncClient,
6+
StateSyncClient,
7+
StateSyncClientError,
8+
};
9+
use apollo_state_sync_types::errors::StateSyncError;
510
use async_trait::async_trait;
611
use starknet_api::block::{BlockInfo, BlockNumber, GasPriceVector, GasPrices};
12+
use starknet_api::core::{ContractAddress, Nonce};
713
use starknet_api::data_availability::L1DataAvailabilityMode;
814
use tokio::sync::OnceCell;
915

@@ -14,6 +20,7 @@ pub type StarknetResult<T> = Result<T, StarknetError>;
1420
#[async_trait]
1521
pub trait GatewayFixedBlockStateReader: Send + Sync {
1622
async fn get_block_info(&self) -> StarknetResult<BlockInfo>;
23+
async fn get_nonce(&self, contract_address: ContractAddress) -> StarknetResult<Nonce>;
1724
}
1825

1926
pub struct GatewayFixedBlockSyncStateClient {
@@ -67,4 +74,14 @@ impl GatewayFixedBlockStateReader for GatewayFixedBlockSyncStateClient {
6774
.await
6875
.cloned()
6976
}
77+
78+
async fn get_nonce(&self, contract_address: ContractAddress) -> StarknetResult<Nonce> {
79+
match self.state_sync_client.get_nonce_at(self.block_number, contract_address).await {
80+
Ok(nonce) => Ok(nonce),
81+
Err(StateSyncClientError::StateSyncError(StateSyncError::ContractNotFound(_))) => {
82+
Ok(Nonce::default())
83+
}
84+
Err(e) => Err(StarknetError::internal_with_logging("Failed to get nonce", e)),
85+
}
86+
}
7087
}

crates/apollo_gateway/src/gateway_test.rs

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use strum::VariantNames;
8181
use tempfile::TempDir;
8282

8383
use crate::errors::{GatewayResult, StatelessTransactionValidatorError};
84-
use crate::gateway::{Gateway, ProcessTxBlockingTask};
84+
use crate::gateway::Gateway;
8585
use crate::metrics::{
8686
register_metrics,
8787
GatewayMetricHandle,
@@ -322,18 +322,6 @@ async fn run_add_tx_and_extract_metrics(
322322
AddTxResults { result, metric_handle_for_queries, metrics }
323323
}
324324

325-
fn process_tx_task(
326-
stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
327-
) -> ProcessTxBlockingTask {
328-
ProcessTxBlockingTask {
329-
stateful_tx_validator_factory: Arc::new(stateful_transaction_validator_factory),
330-
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
331-
mempool_client: Arc::new(MockMempoolClient::new()),
332-
executable_tx: executable_invoke_tx(invoke_args()),
333-
runtime: tokio::runtime::Handle::current(),
334-
}
335-
}
336-
337325
// Gateway spec errors tests.
338326
// TODO(Arni): Add tests for all the error cases. Check the response (use `into_response` on the
339327
// result of `add_tx`).
@@ -556,10 +544,11 @@ fn test_full_cycle_dump_deserialize_authorized_declarer_accounts(
556544
StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into())
557545
)]
558546
#[tokio::test]
559-
async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_fails(
547+
async fn add_tx_returns_error_when_run_transaction_validations_fails(
560548
#[case] error_code: StarknetErrorCode,
561549
mut mock_stateful_transaction_validator: MockStatefulTransactionValidatorTrait,
562550
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
551+
mut mock_dependencies: MockDependencies,
563552
) {
564553
let expected_error = StarknetError {
565554
code: error_code.clone(),
@@ -574,9 +563,18 @@ async fn process_tx_returns_error_when_extract_state_nonce_and_run_validations_f
574563
.expect_instantiate_validator()
575564
.return_once(|_| Ok(Box::new(mock_stateful_transaction_validator)));
576565

577-
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory);
566+
let tx_args = invoke_args();
567+
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
568+
let gateway = Gateway {
569+
config: Arc::new(mock_dependencies.config),
570+
stateless_tx_validator: Arc::new(mock_dependencies.mock_stateless_transaction_validator),
571+
stateful_tx_validator_factory: Arc::new(mock_stateful_transaction_validator_factory),
572+
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
573+
mempool_client: Arc::new(mock_dependencies.mock_mempool_client),
574+
transaction_converter: Arc::new(mock_dependencies.mock_transaction_converter),
575+
};
578576

579-
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap();
577+
let result = gateway.add_tx(tx_args.get_rpc_tx(), None).await;
580578

581579
assert!(result.is_err());
582580
assert_eq!(result.unwrap_err().code, error_code);
@@ -605,8 +603,9 @@ async fn stateless_transaction_validator_error(mut mock_dependencies: MockDepend
605603

606604
#[rstest]
607605
#[tokio::test]
608-
async fn process_tx_returns_error_when_instantiating_validator_fails(
606+
async fn add_tx_returns_error_when_instantiating_validator_fails(
609607
mut mock_stateful_transaction_validator_factory: MockStatefulTransactionValidatorFactoryTrait,
608+
mut mock_dependencies: MockDependencies,
610609
) {
611610
let error_code = StarknetErrorCode::UnknownErrorCode("StarknetErrorCode.InternalError".into());
612611
let expected_error = StarknetError {
@@ -617,9 +616,18 @@ async fn process_tx_returns_error_when_instantiating_validator_fails(
617616
.expect_instantiate_validator()
618617
.return_once(|_| Err(expected_error));
619618

620-
let process_tx_task = process_tx_task(mock_stateful_transaction_validator_factory);
619+
let tx_args = invoke_args();
620+
setup_transaction_converter_mock(&mut mock_dependencies.mock_transaction_converter, &tx_args);
621+
let gateway = Gateway {
622+
config: Arc::new(mock_dependencies.config),
623+
stateless_tx_validator: Arc::new(mock_dependencies.mock_stateless_transaction_validator),
624+
stateful_tx_validator_factory: Arc::new(mock_stateful_transaction_validator_factory),
625+
state_reader_factory: Arc::new(MockStateReaderFactory::new()),
626+
mempool_client: Arc::new(mock_dependencies.mock_mempool_client),
627+
transaction_converter: Arc::new(mock_dependencies.mock_transaction_converter),
628+
};
621629

622-
let result = tokio::task::spawn_blocking(move || process_tx_task.process_tx()).await.unwrap();
630+
let result = gateway.add_tx(tx_args.get_rpc_tx(), None).await;
623631

624632
assert!(result.is_err());
625633
assert_eq!(result.unwrap_err().code, error_code);

crates/apollo_gateway/src/rpc_state_reader.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,27 @@ impl GatewayFixedBlockStateReader for RpcStateReader {
210210
})?;
211211
Ok(block_info)
212212
}
213+
214+
async fn get_nonce(&self, contract_address: ContractAddress) -> StarknetResult<Nonce> {
215+
let get_nonce_params = GetNonceParams { block_id: self.block_id, contract_address };
216+
let reader = self.clone();
217+
let result = tokio::task::spawn_blocking(move || {
218+
reader.send_rpc_request("starknet_getNonce", get_nonce_params)
219+
})
220+
.await
221+
.map_err(|e| StarknetError::internal_with_logging("JoinError", e))?;
222+
223+
match result {
224+
Ok(value) => {
225+
let nonce: Nonce = serde_json::from_value(value).map_err(|e| {
226+
StarknetError::internal_with_logging("Failed to parse nonce", e)
227+
})?;
228+
Ok(nonce)
229+
}
230+
Err(RPCStateReaderError::ContractAddressNotFound(_)) => Ok(Nonce::default()),
231+
Err(e) => Err(StarknetError::internal_with_logging("RPC error", e)),
232+
}
233+
}
213234
}
214235

215236
#[async_trait]

crates/apollo_gateway/src/state_reader_test_utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl GatewayFixedBlockStateReader for TestGatewayFixedBlockStateReader {
7979
async fn get_block_info(&self) -> StarknetResult<BlockInfo> {
8080
Ok(self.block_info.clone())
8181
}
82+
83+
async fn get_nonce(&self, _contract_address: ContractAddress) -> StarknetResult<Nonce> {
84+
Ok(Nonce::default())
85+
}
8286
}
8387

8488
pub struct TestStateReaderFactory {

crates/apollo_gateway/src/stateful_transaction_validator.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use starknet_types_core::felt::Felt;
3535
use tracing::debug;
3636

3737
use crate::errors::{mempool_client_err_to_deprecated_gw_err, StatefulTransactionValidatorResult};
38+
use crate::gateway_fixed_block_state_reader::GatewayFixedBlockStateReader;
3839
use crate::metrics::{GATEWAY_CLASS_CACHE_METRICS, GATEWAY_VALIDATE_TX_LATENCY};
3940
use crate::state_reader::{GatewayStateReaderWithCompiledClasses, StateReaderFactory};
4041

@@ -108,12 +109,13 @@ impl StatefulTransactionValidatorFactoryTrait for StatefulTransactionValidatorFa
108109
Ok(Box::new(StatefulTransactionValidator {
109110
config: self.config.clone(),
110111
blockifier_stateful_tx_validator,
112+
gateway_fixed_block_state_reader,
111113
}))
112114
}
113115
}
114116

115117
#[cfg_attr(test, mockall::automock)]
116-
pub trait StatefulTransactionValidatorTrait {
118+
pub trait StatefulTransactionValidatorTrait: Send {
117119
fn extract_state_nonce_and_run_validations(
118120
&mut self,
119121
executable_tx: &ExecutableTransaction,
@@ -125,9 +127,10 @@ pub trait StatefulTransactionValidatorTrait {
125127
pub struct StatefulTransactionValidator<B: BlockifierStatefulValidatorTrait> {
126128
config: StatefulTransactionValidatorConfig,
127129
blockifier_stateful_tx_validator: B,
130+
gateway_fixed_block_state_reader: Box<dyn GatewayFixedBlockStateReader>,
128131
}
129132

130-
impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidatorTrait
133+
impl<B: BlockifierStatefulValidatorTrait + Send> StatefulTransactionValidatorTrait
131134
for StatefulTransactionValidator<B>
132135
{
133136
fn extract_state_nonce_and_run_validations(
@@ -137,22 +140,23 @@ impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidatorTrait
137140
runtime: tokio::runtime::Handle,
138141
) -> StatefulTransactionValidatorResult<Nonce> {
139142
let address = executable_tx.contract_address();
140-
let account_nonce =
141-
self.blockifier_stateful_tx_validator.get_nonce(address).map_err(|e| {
143+
let account_nonce = runtime
144+
.block_on(self.gateway_fixed_block_state_reader.get_nonce(address))
145+
.map_err(|e| {
142146
// TODO(noamsp): Fix this. Need to map the errors better.
143147
StarknetError::internal_with_signature_logging(
144148
format!("Failed to get nonce for sender address {address}"),
145149
&executable_tx.signature(),
146150
e,
147151
)
148152
})?;
149-
self.run_transaction_validations(executable_tx, account_nonce, mempool_client, runtime)?;
153+
self.run_transaction_validation(executable_tx, account_nonce, mempool_client, runtime)?;
150154
Ok(account_nonce)
151155
}
152156
}
153157

154158
impl<B: BlockifierStatefulValidatorTrait> StatefulTransactionValidator<B> {
155-
fn run_transaction_validations(
159+
fn run_transaction_validation(
156160
&mut self,
157161
executable_tx: &ExecutableTransaction,
158162
account_nonce: Nonce,

0 commit comments

Comments
 (0)