diff --git a/Cargo.lock b/Cargo.lock index d31c9216..0009e330 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1634,10 +1634,16 @@ dependencies = [ "alloy-genesis", "alloy-primitives", "alloy-rpc-client", + "chrono", "eyre", + "indexmap 2.12.0", "jsonrpsee 0.26.0", + "metrics", "op-alloy-consensus 0.22.0", + "op-alloy-flz", + "parking_lot", "rand 0.9.2", + "rdkafka", "reth", "reth-db", "reth-db-common", @@ -1647,10 +1653,13 @@ dependencies = [ "reth-optimism-cli", "reth-optimism-evm 1.9.1", "reth-optimism-node", + "reth-optimism-payload-builder 1.9.1", "reth-optimism-primitives 1.9.1", + "reth-optimism-rpc", "reth-primitives", "reth-primitives-traits 1.9.1", "reth-provider", + "reth-rpc-server-types", "reth-testing-utils", "reth-tracing", "reth-transaction-pool 1.9.1", @@ -1660,6 +1669,7 @@ dependencies = [ "tips-core", "tokio", "tracing", + "tracing-subscriber 0.3.20", ] [[package]] @@ -1693,6 +1703,8 @@ dependencies = [ "op-alloy-rpc-jsonrpsee", "op-alloy-rpc-types 0.22.0", "op-alloy-rpc-types-engine 0.22.0", + "parking_lot", + "rdkafka", "reqwest", "reth", "reth-cli-util", @@ -1706,6 +1718,7 @@ dependencies = [ "reth-primitives", "reth-rpc-convert", "reth-rpc-eth-api", + "reth-rpc-server-types", "revm 31.0.1", "revm-bytecode 7.1.1", "rollup-boost", @@ -1713,6 +1726,7 @@ dependencies = [ "serde", "serde_json", "time", + "tips-core", "tokio", "tokio-stream", "tokio-tungstenite", @@ -6221,6 +6235,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-src" +version = "300.5.4+3.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507b3792995dae9b0df8a1c1e3771e8418b7c2d9f0baeba32e6fe8b06c7cb72" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.110" @@ -6229,6 +6252,7 @@ checksum = "0a9f0075ba3c21b09f8e8b2026584b1d18d49388648f2fbbf3c97ea8deced8e2" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -7236,6 +7260,37 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.9.0+2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230dca48bc354d718269f3e4353280e188b610f7af7e2fcf54b7a79d5802872" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "openssl-sys", + "pkg-config", +] + [[package]] name = "recvmsg" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 88449a2f..b06bad32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-rpc-convert = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } +reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } @@ -69,6 +70,7 @@ reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-testing-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } reth-db-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } +reth-rpc-server-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" } # revm revm = { version = "31.0.0", default-features = false } @@ -96,6 +98,8 @@ op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false } op-alloy-rpc-jsonrpsee = { version = "0.22.0", default-features = false } op-alloy-network = { version = "0.22.0", default-features = false } op-alloy-consensus = { version = "0.22.0", default-features = false } +op-alloy-flz = { version = "0.13.1", default-features = false } +rdkafka = { version = "0.37.0", default-features = false, features = ["tokio", "ssl-vendored", "libz-static"] } # rollup-boost rollup-boost = { git = "http://github.com/flashbots/rollup-boost", tag = "rollup-boost/v0.7.5" } @@ -132,3 +136,6 @@ brotli = "8.0.1" arc-swap = "1.7.1" once_cell = "1.19" rand = "0.9.2" +tracing-subscriber = "0.3.18" +parking_lot = "0.12.3" +indexmap = "2.7.0" diff --git a/crates/metering/Cargo.toml b/crates/metering/Cargo.toml index 42341f6c..c85b74c2 100644 --- a/crates/metering/Cargo.toml +++ b/crates/metering/Cargo.toml @@ -23,6 +23,7 @@ reth-primitives-traits.workspace = true reth-evm.workspace = true reth-optimism-evm.workspace = true reth-optimism-chainspec.workspace = true +reth-optimism-payload-builder.workspace = true reth-optimism-primitives.workspace = true reth-transaction-pool.workspace = true reth-optimism-cli.workspace = true # Enables serde & codec traits for OpReceipt/OpTxEnvelope @@ -34,6 +35,7 @@ alloy-eips.workspace = true # op-alloy op-alloy-consensus.workspace = true +op-alloy-flz.workspace = true # revm revm.workspace = true @@ -45,6 +47,13 @@ jsonrpsee.workspace = true tracing.workspace = true serde.workspace = true eyre.workspace = true +indexmap.workspace = true +parking_lot.workspace = true +tokio.workspace = true +rdkafka.workspace = true +serde_json.workspace = true +metrics.workspace = true +chrono.workspace = true [dev-dependencies] alloy-genesis.workspace = true @@ -54,10 +63,10 @@ reth-db = { workspace = true, features = ["test-utils"] } reth-db-common.workspace = true reth-e2e-test-utils.workspace = true reth-optimism-node.workspace = true +reth-optimism-payload-builder.workspace = true +reth-optimism-rpc.workspace = true +reth-rpc-server-types.workspace = true reth-testing-utils.workspace = true reth-tracing.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } -serde_json.workspace = true -tokio.workspace = true - - +tracing-subscriber.workspace = true diff --git a/crates/metering/README.md b/crates/metering/README.md index 1cc88343..d8f86afa 100644 --- a/crates/metering/README.md +++ b/crates/metering/README.md @@ -60,3 +60,40 @@ Note: While some fields like `revertingTxHashes` are part of the TIPS Bundle for - Stops on first failure - Automatically registered in `base` namespace +## Upcoming Features + +- **In-memory metering cache:** Maintains per-flashblock resource snapshots (gas, DA bytes, + execution time, state-root time) for the latest 12 blocks to support pricing decisions. +- **Stream ingestion:** Background tasks will hydrate the cache by consuming the TIPS Kafka + feed (for timing metrics) and the flashblocks websocket stream (for inclusion order). +- **Priority-fee estimator:** Aggregates cached data in ascending priority-fee order to + project the fee a bundle must pay to satisfy each resource constraint, including + percentile-based recommendations. (Core logic implemented; awaiting live data feed.) +- **`base_meteredPriorityFeePerGas` RPC:** Accepts a TIPS Bundle, meters it locally, and + responds with per-resource fee suggestions for each flashblock index plus aggregated + min/max guidance for next-block and next-flashblock inclusion. Currently returns an error + until the ingestion tasks populate the metering cache. + +### Observability Contract + +- **Gauges**: `metering.kafka.lag_ms`, `metering.cache.latest_block`, `metering.cache.window_depth` + track data freshness and cache depth. +- **Counters**: `metering.kafka.messages_total`, `metering.kafka.errors_total`, + `metering.kafka.messages_skipped`, `metering.streams.flashblocks_total`, + `metering.streams.misses_total`, `metering.cache.tx_events_total` capture ingestion health. +- Detailed per-transaction diagnostics (missing flashblock index, snapshot drops) are emitted as + structured tracing events instead of high-cardinality metrics. + +## Testing & Observability Plan + +- **Unit coverage:** Exercise cache eviction, transaction ordering, and estimator threshold + logic with synthetic datasets (see `cache.rs` and `estimator.rs` tests). Extend with Kafka + parsing tests once the ingest message schema is integrated. +- **Integration harness:** Feed mocked Kafka + websocket streams into the ingest pipeline to + validate cache hydration and end-to-end RPC responses. Leverage existing async test + utilities in the workspace for deterministic sequencing. +- **Property-style checks:** Generate random transaction fee/usage distributions to ensure the + estimator produces monotonic thresholds and sensible percentiles across resource types. +- **Metrics & tracing:** Emit gauges for cache freshness (latest block/index), Kafka lag, + websocket heartbeat, and estimator latency. Reuse the existing `tracing` instrumentation + pattern in the repo so operators can alert on stale data paths. diff --git a/crates/metering/src/annotator.rs b/crates/metering/src/annotator.rs new file mode 100644 index 00000000..3e41d4dc --- /dev/null +++ b/crates/metering/src/annotator.rs @@ -0,0 +1,135 @@ +use crate::{MeteredTransaction, MeteringCache}; +use alloy_primitives::TxHash; +use parking_lot::RwLock; +use std::sync::Arc; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{debug, info, warn}; + +/// Message received from the flashblocks websocket feed indicating which +/// transactions were included in a specific flashblock. +#[derive(Debug)] +pub struct FlashblockInclusion { + pub block_number: u64, + pub flashblock_index: u64, + /// Tx hashes included in this flashblock. + pub ordered_tx_hashes: Vec, +} + +/// Maximum number of pending transactions before oldest entries are evicted. +const MAX_PENDING_TRANSACTIONS: usize = 10_000; + +/// Annotates flashblock transactions with their resource usage. +/// +/// The flow is: +/// 1. Kafka sends `MeteredTransaction` with resource usage data keyed by tx hash +/// 2. These are stored in a pending lookup table +/// 3. Websocket sends `FlashblockInclusion` with actual (block, flashblock) location +/// 4. We look up pending transactions and insert them into the cache at the real location +pub struct ResourceAnnotator { + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + /// Pending metering data awaiting flashblock inclusion confirmation. + /// Uses IndexMap to maintain insertion order for FIFO eviction. + pending_transactions: indexmap::IndexMap, +} + +impl ResourceAnnotator { + pub fn new( + cache: Arc>, + tx_updates_rx: UnboundedReceiver, + flashblock_rx: UnboundedReceiver, + ) -> Self { + Self { + cache, + tx_updates_rx, + flashblock_rx, + pending_transactions: indexmap::IndexMap::new(), + } + } + + pub async fn run(mut self) { + info!(target: "metering::annotator", "Starting ResourceAnnotator"); + loop { + tokio::select! { + Some(tx_event) = self.tx_updates_rx.recv() => { + self.handle_tx_event(tx_event); + } + Some(flashblock_event) = self.flashblock_rx.recv() => { + self.handle_flashblock_event(flashblock_event); + } + else => { + info!(target: "metering::annotator", "ResourceAnnotator terminating"); + break; + } + } + } + } + + fn handle_tx_event(&mut self, tx: MeteredTransaction) { + debug!( + tx_hash = %tx.tx_hash, + gas_used = tx.gas_used, + "Storing metered transaction in pending map" + ); + self.pending_transactions.insert(tx.tx_hash, tx); + + // Evict oldest entries if we exceed the limit. + while self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { + if let Some((evicted_hash, _)) = self.pending_transactions.shift_remove_index(0) { + info!( + tx_hash = %evicted_hash, + "Evicting old transaction from pending map (limit exceeded)" + ); + metrics::counter!("metering.pending.evicted").increment(1); + } + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + metrics::counter!("metering.kafka.tx_events_total").increment(1); + } + + fn handle_flashblock_event(&mut self, event: FlashblockInclusion) { + metrics::counter!("metering.streams.flashblocks_total").increment(1); + + let mut matched = 0usize; + let mut missed = 0usize; + + { + let mut cache = self.cache.write(); + for tx_hash in &event.ordered_tx_hashes { + if let Some(tx) = self.pending_transactions.shift_remove(tx_hash) { + cache.upsert_transaction(event.block_number, event.flashblock_index, tx); + matched += 1; + } else { + missed += 1; + } + } + } + + if matched > 0 { + debug!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + "Inserted transactions into cache from flashblock" + ); + } + + // All transactions should come through as bundles. Any misses indicate + // the Kafka event hasn't arrived yet or was lost. + if missed > 0 { + warn!( + block_number = event.block_number, + flashblock_index = event.flashblock_index, + matched, + missed, + "Flashblock contained transactions not found in pending map" + ); + metrics::counter!("metering.streams.tx_misses_total").increment(missed as u64); + } + + metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64); + metrics::counter!("metering.streams.tx_matched_total").increment(matched as u64); + } +} diff --git a/crates/metering/src/cache.rs b/crates/metering/src/cache.rs new file mode 100644 index 00000000..0844b3a3 --- /dev/null +++ b/crates/metering/src/cache.rs @@ -0,0 +1,376 @@ +use alloy_primitives::B256; +use indexmap::IndexMap; +use std::collections::{BTreeMap, HashMap, VecDeque}; + +use alloy_primitives::U256; + +#[derive(Debug, Clone)] +pub struct MeteredTransaction { + pub tx_hash: B256, + pub priority_fee_per_gas: U256, + pub gas_used: u64, + pub execution_time_us: u128, + pub state_root_time_us: u128, + pub data_availability_bytes: u64, +} + +impl MeteredTransaction { + pub fn zeroed(tx_hash: B256) -> Self { + Self { + tx_hash, + priority_fee_per_gas: U256::ZERO, + gas_used: 0, + execution_time_us: 0, + state_root_time_us: 0, + data_availability_bytes: 0, + } + } +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceTotals { + pub gas_used: u64, + pub execution_time_us: u128, + pub state_root_time_us: u128, + pub data_availability_bytes: u64, +} + +impl ResourceTotals { + fn accumulate(&mut self, tx: &MeteredTransaction) { + self.gas_used = self.gas_used.saturating_add(tx.gas_used); + self.execution_time_us = self.execution_time_us.saturating_add(tx.execution_time_us); + self.state_root_time_us = self + .state_root_time_us + .saturating_add(tx.state_root_time_us); + self.data_availability_bytes = self + .data_availability_bytes + .saturating_add(tx.data_availability_bytes); + } + + fn subtract(&mut self, tx: &MeteredTransaction) { + self.gas_used = self.gas_used.saturating_sub(tx.gas_used); + self.execution_time_us = self.execution_time_us.saturating_sub(tx.execution_time_us); + self.state_root_time_us = self + .state_root_time_us + .saturating_sub(tx.state_root_time_us); + self.data_availability_bytes = self + .data_availability_bytes + .saturating_sub(tx.data_availability_bytes); + } +} + +/// Metrics for a single flashblock within a block. +#[derive(Debug)] +pub struct FlashblockMetrics { + pub block_number: u64, + pub flashblock_index: u64, + /// Transactions keyed by hash in insertion order. + transactions: IndexMap, + totals: ResourceTotals, +} + +impl FlashblockMetrics { + pub fn new(block_number: u64, flashblock_index: u64) -> Self { + Self { + block_number, + flashblock_index, + transactions: IndexMap::new(), + totals: ResourceTotals::default(), + } + } + + pub fn upsert_transaction(&mut self, tx: MeteredTransaction) { + let tx_hash = tx.tx_hash; + if let Some(existing) = self.transactions.get(&tx_hash) { + self.totals.subtract(existing); + } + self.totals.accumulate(&tx); + self.transactions.insert(tx_hash, tx); + } + + pub fn remove_transaction(&mut self, tx_hash: &B256) -> Option { + let removed = self.transactions.shift_remove(tx_hash); + if let Some(ref tx) = removed { + self.totals.subtract(tx); + } + removed + } + + pub fn totals(&self) -> ResourceTotals { + self.totals + } + + pub fn transactions(&self) -> impl Iterator { + self.transactions.values() + } + + pub fn transactions_sorted_by_priority_fee(&self) -> Vec<&MeteredTransaction> { + let mut txs: Vec<&MeteredTransaction> = self.transactions.values().collect(); + txs.sort_by(|a, b| a.priority_fee_per_gas.cmp(&b.priority_fee_per_gas)); + txs + } + + pub fn len(&self) -> usize { + self.transactions.len() + } + + pub fn is_empty(&self) -> bool { + self.transactions.is_empty() + } +} + +/// Aggregated metrics for a block, including per-flashblock breakdown. +#[derive(Debug)] +pub struct BlockMetrics { + pub block_number: u64, + flashblocks: BTreeMap, + totals: ResourceTotals, +} + +impl BlockMetrics { + pub fn new(block_number: u64) -> Self { + Self { + block_number, + flashblocks: BTreeMap::new(), + totals: ResourceTotals::default(), + } + } + + pub fn flashblock_count(&self) -> usize { + self.flashblocks.len() + } + + pub fn flashblocks(&self) -> impl Iterator { + self.flashblocks.values() + } + + pub fn flashblock(&self, flashblock_index: u64) -> Option<&FlashblockMetrics> { + self.flashblocks.get(&flashblock_index) + } + + pub fn flashblock_mut(&mut self, flashblock_index: u64) -> (&mut FlashblockMetrics, bool) { + let is_new = !self.flashblocks.contains_key(&flashblock_index); + let entry = self + .flashblocks + .entry(flashblock_index) + .or_insert_with(|| FlashblockMetrics::new(self.block_number, flashblock_index)); + (entry, is_new) + } + + pub fn totals(&self) -> ResourceTotals { + self.totals + } + + fn recompute_totals(&mut self) { + self.totals = ResourceTotals::default(); + for flashblock in self.flashblocks.values() { + let totals = flashblock.totals(); + self.totals.gas_used = self.totals.gas_used.saturating_add(totals.gas_used); + self.totals.execution_time_us = self + .totals + .execution_time_us + .saturating_add(totals.execution_time_us); + self.totals.state_root_time_us = self + .totals + .state_root_time_us + .saturating_add(totals.state_root_time_us); + self.totals.data_availability_bytes = self + .totals + .data_availability_bytes + .saturating_add(totals.data_availability_bytes); + } + } +} + +/// In-memory cache maintaining metering data for the most recent blocks. +#[derive(Debug)] +pub struct MeteringCache { + max_blocks: usize, + blocks: VecDeque, + block_index: HashMap, +} + +impl MeteringCache { + /// Creates a new cache retaining at most `max_blocks` recent blocks. + pub fn new(max_blocks: usize) -> Self { + Self { + max_blocks, + blocks: VecDeque::new(), + block_index: HashMap::new(), + } + } + + pub fn max_blocks(&self) -> usize { + self.max_blocks + } + + pub fn block(&self, block_number: u64) -> Option<&BlockMetrics> { + self.block_index + .get(&block_number) + .and_then(|&idx| self.blocks.get(idx)) + } + + pub fn block_mut(&mut self, block_number: u64) -> &mut BlockMetrics { + if let Some(&idx) = self.block_index.get(&block_number) { + return self.blocks.get_mut(idx).expect("block index out of bounds"); + } + + let block = BlockMetrics::new(block_number); + self.blocks.push_back(block); + let idx = self.blocks.len() - 1; + self.block_index.insert(block_number, idx); + + self.evict_if_needed(); + self.blocks + .get_mut(*self.block_index.get(&block_number).unwrap()) + .unwrap() + } + + pub fn flashblock( + &self, + block_number: u64, + flashblock_index: u64, + ) -> Option<&FlashblockMetrics> { + self.block(block_number) + .and_then(|block| block.flashblock(flashblock_index)) + } + + pub fn upsert_transaction( + &mut self, + block_number: u64, + flashblock_index: u64, + tx: MeteredTransaction, + ) { + let block = self.block_mut(block_number); + let (flashblock, _) = block.flashblock_mut(flashblock_index); + flashblock.upsert_transaction(tx); + block.recompute_totals(); + } + + pub fn remove_transaction( + &mut self, + block_number: u64, + flashblock_index: u64, + tx_hash: &B256, + ) -> Option { + let block = self.block_mut(block_number); + let (flashblock, _) = block.flashblock_mut(flashblock_index); + let removed = flashblock.remove_transaction(tx_hash); + block.recompute_totals(); + removed + } + + pub fn len(&self) -> usize { + self.blocks.len() + } + + pub fn is_empty(&self) -> bool { + self.blocks.is_empty() + } + + pub fn blocks_desc(&self) -> impl Iterator { + self.blocks.iter().rev() + } + + fn evict_if_needed(&mut self) { + let mut evicted = false; + while self.blocks.len() > self.max_blocks { + if let Some(oldest) = self.blocks.pop_front() { + self.block_index.remove(&oldest.block_number); + evicted = true; + } + } + // Rebuild index once after all evictions to maintain correctness. + if evicted { + self.rebuild_index(); + } + } + + fn rebuild_index(&mut self) { + self.block_index.clear(); + for (idx, block) in self.blocks.iter().enumerate() { + self.block_index.insert(block.block_number, idx); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_tx(hash: u64, priority: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&hash.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: 10, + execution_time_us: 5, + state_root_time_us: 7, + data_availability_bytes: 20, + } + } + + #[test] + fn insert_and_retrieve_transactions() { + let mut cache = MeteringCache::new(12); + let tx1 = test_tx(1, 2); + cache.upsert_transaction(100, 0, tx1.clone()); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + assert_eq!(flashblock.len(), 1); + assert_eq!( + flashblock.transactions().next().unwrap().tx_hash, + tx1.tx_hash + ); + } + + #[test] + fn replaces_existing_transaction() { + let mut cache = MeteringCache::new(12); + let mut tx1 = test_tx(1, 2); + cache.upsert_transaction(100, 0, tx1.clone()); + tx1.gas_used = 42; + cache.upsert_transaction(100, 0, tx1.clone()); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + assert_eq!(flashblock.len(), 1); + assert_eq!( + flashblock.transactions().next().unwrap().gas_used, + tx1.gas_used + ); + } + + #[test] + fn evicts_old_blocks() { + let mut cache = MeteringCache::new(2); + for block_number in 0..3u64 { + cache.upsert_transaction(block_number, 0, test_tx(block_number, block_number)); + } + assert!(cache.block(0).is_none()); + assert!(cache.block(1).is_some()); + assert!(cache.block(2).is_some()); + } + + #[test] + fn transactions_sorted_by_priority_fee() { + let mut cache = MeteringCache::new(12); + cache.upsert_transaction(100, 0, test_tx(1, 30)); + cache.upsert_transaction(100, 0, test_tx(2, 10)); + cache.upsert_transaction(100, 0, test_tx(3, 20)); + + let block = cache.block(100).unwrap(); + let flashblock = block.flashblocks().next().unwrap(); + let sorted: Vec<_> = flashblock + .transactions_sorted_by_priority_fee() + .iter() + .map(|tx| tx.priority_fee_per_gas) + .collect(); + assert_eq!( + sorted, + vec![U256::from(10u64), U256::from(20u64), U256::from(30u64)] + ); + } +} diff --git a/crates/metering/src/estimator.rs b/crates/metering/src/estimator.rs new file mode 100644 index 00000000..48d1c2b7 --- /dev/null +++ b/crates/metering/src/estimator.rs @@ -0,0 +1,916 @@ +use crate::{MeteredTransaction, MeteringCache}; +use alloy_primitives::U256; +use parking_lot::RwLock; +use reth_optimism_payload_builder::config::OpDAConfig; +use std::sync::Arc; + +/// Errors that can occur during priority fee estimation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EstimateError { + /// The bundle's resource demand exceeds the configured capacity limit. + DemandExceedsCapacity { + resource: ResourceKind, + demand: u128, + limit: u128, + }, +} + +impl std::fmt::Display for EstimateError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + EstimateError::DemandExceedsCapacity { + resource, + demand, + limit, + } => { + write!( + f, + "bundle {} demand ({}) exceeds capacity limit ({})", + resource.as_name(), + demand, + limit + ) + } + } + } +} + +impl std::error::Error for EstimateError {} + +/// Configured capacity limits for each resource type. +/// +/// These values define the maximum capacity available per flashblock (or per block +/// for "use-it-or-lose-it" resources). The estimator uses these limits to determine +/// when resources are congested. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceLimits { + pub gas_used: Option, + pub execution_time_us: Option, + pub state_root_time_us: Option, + pub data_availability_bytes: Option, +} + +impl ResourceLimits { + /// Returns the limit for the given resource kind. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Resources that influence flashblock inclusion ordering. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum ResourceKind { + GasUsed, + ExecutionTime, + StateRootTime, + DataAvailability, +} + +impl ResourceKind { + /// Returns all resource kinds in a fixed order. + pub fn all() -> [ResourceKind; 4] { + [ + ResourceKind::GasUsed, + ResourceKind::ExecutionTime, + ResourceKind::StateRootTime, + ResourceKind::DataAvailability, + ] + } + + /// Returns `true` if this resource is "use-it-or-lose-it", meaning capacity + /// that isn't consumed in one flashblock cannot be reclaimed in later ones. + /// + /// Execution time is the canonical example: the block builder has a fixed + /// time budget per block, and unused time in flashblock 0 doesn't roll over + /// to flashblock 1. For these resources, the estimator aggregates usage + /// across all flashblocks rather than evaluating each flashblock in isolation. + /// + /// Other resources like gas and DA bytes are bounded per-block but are + /// evaluated per-flashblock since their limits apply independently. + fn use_it_or_lose_it(self) -> bool { + matches!(self, ResourceKind::ExecutionTime) + } + + /// Returns a human-readable name for the resource kind. + pub fn as_name(&self) -> &'static str { + match self { + ResourceKind::GasUsed => "gas", + ResourceKind::ExecutionTime => "execution time", + ResourceKind::StateRootTime => "state root time", + ResourceKind::DataAvailability => "data availability", + } + } +} + +/// Amount of resources required by the bundle being priced. +#[derive(Debug, Clone, Copy, Default)] +pub struct ResourceDemand { + pub gas_used: Option, + pub execution_time_us: Option, + pub state_root_time_us: Option, + pub data_availability_bytes: Option, +} + +impl ResourceDemand { + fn demand_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::GasUsed => self.gas_used.map(|v| v as u128), + ResourceKind::ExecutionTime => self.execution_time_us, + ResourceKind::StateRootTime => self.state_root_time_us, + ResourceKind::DataAvailability => self.data_availability_bytes.map(|v| v as u128), + } + } +} + +/// Fee estimate for a single resource type. +/// +/// The estimation algorithm answers: "What priority fee would my bundle need to pay +/// to displace enough lower-paying transactions to free up the resources I need?" +#[derive(Debug, Clone)] +pub struct ResourceEstimate { + /// Minimum fee to displace enough capacity for the bundle's resource demand. + pub threshold_priority_fee: U256, + /// Recommended fee based on a percentile of transactions above the threshold. + /// Provides a safety margin over the bare minimum. + pub recommended_priority_fee: U256, + /// Total resource usage of transactions at or above the threshold fee. + pub cumulative_usage: u128, + /// Number of transactions at or above `threshold_priority_fee`. These higher-paying + /// transactions remain included alongside the bundle; lower-paying ones are displaced. + pub threshold_tx_count: usize, + /// Total transactions considered in the estimate. + pub total_transactions: usize, +} + +/// Per-resource fee estimates. +/// +/// Each field corresponds to a resource type. `None` indicates the resource +/// was not requested or could not be estimated (e.g., demand exceeds capacity). +#[derive(Debug, Clone, Default)] +pub struct ResourceEstimates { + pub gas_used: Option, + pub execution_time: Option, + pub state_root_time: Option, + pub data_availability: Option, +} + +impl ResourceEstimates { + /// Returns the estimate for the given resource kind. + pub fn get(&self, kind: ResourceKind) -> Option<&ResourceEstimate> { + match kind { + ResourceKind::GasUsed => self.gas_used.as_ref(), + ResourceKind::ExecutionTime => self.execution_time.as_ref(), + ResourceKind::StateRootTime => self.state_root_time.as_ref(), + ResourceKind::DataAvailability => self.data_availability.as_ref(), + } + } + + /// Sets the estimate for the given resource kind. + pub fn set(&mut self, kind: ResourceKind, estimate: ResourceEstimate) { + match kind { + ResourceKind::GasUsed => self.gas_used = Some(estimate), + ResourceKind::ExecutionTime => self.execution_time = Some(estimate), + ResourceKind::StateRootTime => self.state_root_time = Some(estimate), + ResourceKind::DataAvailability => self.data_availability = Some(estimate), + } + } + + /// Iterates over all present estimates with their resource kind. + pub fn iter(&self) -> impl Iterator { + [ + (ResourceKind::GasUsed, &self.gas_used), + (ResourceKind::ExecutionTime, &self.execution_time), + (ResourceKind::StateRootTime, &self.state_root_time), + (ResourceKind::DataAvailability, &self.data_availability), + ] + .into_iter() + .filter_map(|(kind, opt)| opt.as_ref().map(|est| (kind, est))) + } + + /// Returns true if no estimates are present. + pub fn is_empty(&self) -> bool { + self.iter().next().is_none() + } +} + +/// Estimates for a specific flashblock index. +#[derive(Debug, Clone)] +pub struct FlashblockResourceEstimates { + pub flashblock_index: u64, + pub estimates: ResourceEstimates, +} + +/// Aggregated estimates for a block. +#[derive(Debug, Clone)] +pub struct BlockPriorityEstimates { + pub block_number: u64, + pub flashblocks: Vec, + /// Minimum recommended fee across all flashblocks (easiest inclusion). + pub min_across_flashblocks: ResourceEstimates, + /// Maximum recommended fee across all flashblocks (most competitive). + pub max_across_flashblocks: ResourceEstimates, +} + +/// Rolling estimates aggregated across multiple recent blocks. +#[derive(Debug, Clone)] +pub struct RollingPriorityEstimates { + /// Number of blocks that contributed to this estimate. + pub blocks_sampled: usize, + /// Per-resource estimates (median across sampled blocks). + pub estimates: ResourceEstimates, + /// Single recommended fee: maximum across all resources. + pub recommended_priority_fee: U256, +} + +/// Computes resource fee estimates based on cached flashblock metering data. +pub struct PriorityFeeEstimator { + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + /// Optional shared DA config from the miner RPC. When set, the estimator uses + /// `max_da_block_size` from this config instead of `limits.data_availability_bytes`. + /// This allows dynamic updates via `miner_setMaxDASize`. + da_config: Option, +} + +impl PriorityFeeEstimator { + /// Creates a new estimator referencing the shared metering cache. + /// + /// # Parameters + /// - `cache`: Shared cache containing recent flashblock metering data. + /// - `percentile`: Point in the fee distribution (among transactions above threshold) + /// to use for the recommended fee. + /// - `limits`: Configured resource capacity limits. + /// - `default_priority_fee`: Fee to return when a resource is not congested. + /// - `da_config`: Optional shared DA config for dynamic DA limit updates. + pub fn new( + cache: Arc>, + percentile: f64, + limits: ResourceLimits, + default_priority_fee: U256, + da_config: Option, + ) -> Self { + Self { + cache, + percentile, + limits, + default_priority_fee, + da_config, + } + } + + /// Returns the current DA block size limit, preferring the dynamic `OpDAConfig` value + /// if available, otherwise falling back to the static limit. + pub fn max_da_block_size(&self) -> Option { + self.da_config + .as_ref() + .and_then(|c| c.max_da_block_size()) + .or(self.limits.data_availability_bytes) + } + + /// Returns the limit for the given resource kind, using dynamic config where available. + fn limit_for(&self, resource: ResourceKind) -> Option { + match resource { + ResourceKind::DataAvailability => self.max_da_block_size().map(|v| v as u128), + _ => self.limits.limit_for(resource), + } + } + + /// Returns fee estimates for the provided block. If `block_number` is `None` + /// the most recent block in the cache is used. + /// + /// Returns `Ok(None)` if the cache is empty, the requested block is not cached, + /// or no transactions exist in the cached flashblocks. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_for_block( + &self, + block_number: Option, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_metrics = match block_number { + Some(target) => cache_guard.block(target), + None => cache_guard.blocks_desc().next(), + }; + let Some(block_metrics) = block_metrics else { + return Ok(None); + }; + + let block_number = block_metrics.block_number; + + // Materialise sorted transactions per flashblock so we can drop the lock before + // running the estimation logic. + let mut flashblock_transactions = Vec::new(); + let mut total_tx_count = 0usize; + for flashblock in block_metrics.flashblocks() { + let sorted: Vec = flashblock + .transactions_sorted_by_priority_fee() + .into_iter() + .cloned() + .collect(); + if sorted.is_empty() { + continue; + } + total_tx_count += sorted.len(); + flashblock_transactions.push((flashblock.flashblock_index, sorted)); + } + drop(cache_guard); + + if flashblock_transactions.is_empty() { + return Ok(None); + } + + // Build the aggregate list for use-it-or-lose-it resources by collecting references + // to avoid cloning transactions twice. + let mut aggregate_refs: Vec<&MeteredTransaction> = Vec::with_capacity(total_tx_count); + for (_, txs) in &flashblock_transactions { + aggregate_refs.extend(txs.iter()); + } + aggregate_refs.sort_by(|a, b| a.priority_fee_per_gas.cmp(&b.priority_fee_per_gas)); + + let mut flashblock_estimates = Vec::new(); + + for (flashblock_index, txs) in &flashblock_transactions { + // Build a reference slice for this flashblock's transactions. + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + + let mut estimates = ResourceEstimates::default(); + for resource in ResourceKind::all() { + let Some(demand_value) = demand.demand_for(resource) else { + continue; + }; + let Some(limit_value) = self.limit_for(resource) else { + continue; + }; + + let transactions: &[&MeteredTransaction] = if resource.use_it_or_lose_it() { + &aggregate_refs + } else { + &txs_refs + }; + let estimate = compute_estimate( + resource, + transactions, + demand_value, + limit_value, + usage_extractor(resource), + self.percentile, + self.default_priority_fee, + )?; + + estimates.set(resource, estimate); + } + + flashblock_estimates.push(FlashblockResourceEstimates { + flashblock_index: *flashblock_index, + estimates, + }); + } + + let (min_across_flashblocks, max_across_flashblocks) = + compute_min_max_estimates(&flashblock_estimates); + + Ok(Some(BlockPriorityEstimates { + block_number, + flashblocks: flashblock_estimates, + min_across_flashblocks, + max_across_flashblocks, + })) + } + + /// Returns rolling fee estimates aggregated across the most recent blocks in the cache. + /// + /// For each resource, computes estimates per-block and takes the median recommended fee. + /// The final `recommended_priority_fee` is the maximum across all resources. + /// + /// Returns `Ok(None)` if the cache is empty or no blocks contain transaction data. + /// + /// Returns `Err` if the bundle's demand exceeds any resource's capacity limit. + pub fn estimate_rolling( + &self, + demand: ResourceDemand, + ) -> Result, EstimateError> { + let cache_guard = self.cache.read(); + let block_numbers: Vec = cache_guard.blocks_desc().map(|b| b.block_number).collect(); + drop(cache_guard); + + if block_numbers.is_empty() { + return Ok(None); + } + + // Collect per-block max estimates. Propagate any errors. + let mut block_estimates = Vec::new(); + for &n in &block_numbers { + if let Some(est) = self.estimate_for_block(Some(n), demand)? { + block_estimates.push(est.max_across_flashblocks); + } + } + + if block_estimates.is_empty() { + return Ok(None); + } + + // Compute median fee for each resource across blocks. + let mut estimates = ResourceEstimates::default(); + let mut max_fee = U256::ZERO; + + for resource in ResourceKind::all() { + let mut fees: Vec = block_estimates + .iter() + .filter_map(|e| e.get(resource)) + .map(|e| e.recommended_priority_fee) + .collect(); + + if fees.is_empty() { + continue; + } + + fees.sort(); + let median_fee = fees[fees.len() / 2]; + max_fee = max_fee.max(median_fee); + + estimates.set( + resource, + ResourceEstimate { + threshold_priority_fee: median_fee, + recommended_priority_fee: median_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }, + ); + } + + if estimates.is_empty() { + return Ok(None); + } + + Ok(Some(RollingPriorityEstimates { + blocks_sampled: block_numbers.len(), + estimates, + recommended_priority_fee: max_fee, + })) + } +} + +/// Core estimation algorithm (top-down approach). +/// +/// Given a list of transactions and a resource limit, determines the minimum priority +/// fee needed to be included alongside enough high-paying transactions while still +/// leaving room for the bundle's demand. +/// +/// # Algorithm +/// +/// 1. Sort transactions from highest to lowest priority fee. +/// 2. Walk from the top, subtracting each transaction's usage from remaining capacity. +/// 3. Stop when including another transaction would leave less capacity than the bundle needs. +/// 4. The threshold fee is the fee of the last included transaction (the minimum fee +/// among transactions that would be included alongside the bundle). +/// 5. If we include all transactions and still have capacity >= demand, the resource is +/// not congested, so return the configured default fee. +/// +/// # Example +/// +/// ```text +/// Resource limit: 30 gas +/// Bundle demand: 15 gas +/// +/// Transactions (sorted by priority_fee descending): +/// tx0: priority=10, gas=10 +/// tx1: priority=5, gas=10 +/// tx2: priority=2, gas=10 +/// +/// Walk from top: +/// remaining = 30 - 10 = 20 after tx0 (20 >= 15, can still fit bundle) +/// remaining = 20 - 10 = 10 after tx1 (10 < 15, can't fit bundle → stop before tx1) +/// +/// threshold_fee = 10 (tx0's fee, the last transaction we could include) +/// ``` +/// Returns `Err` if the bundle's demand exceeds the resource limit. +fn compute_estimate( + resource: ResourceKind, + transactions: &[&MeteredTransaction], + demand: u128, + limit: u128, + usage_fn: fn(&MeteredTransaction) -> u128, + percentile: f64, + default_fee: U256, +) -> Result { + // Bundle demand exceeds the resource limit entirely. + if demand > limit { + return Err(EstimateError::DemandExceedsCapacity { + resource, + demand, + limit, + }); + } + + // No transactions or zero demand means no competition for this resource. + if transactions.is_empty() || demand == 0 { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: 0, + threshold_tx_count: 0, + total_transactions: 0, + }); + } + + // Sort transactions by priority fee descending (highest first). + let mut sorted: Vec<_> = transactions.to_vec(); + sorted.sort_by(|a, b| b.priority_fee_per_gas.cmp(&a.priority_fee_per_gas)); + + // Walk from highest-paying transactions, subtracting usage from remaining capacity. + // Stop when we can no longer fit another transaction while leaving room for demand. + let mut remaining = limit; + let mut included_usage = 0u128; + let mut last_included_idx: Option = None; + + for (idx, tx) in sorted.iter().enumerate() { + let usage = usage_fn(tx); + + // Check if we can include this transaction and still have room for the bundle. + if remaining >= usage && remaining.saturating_sub(usage) >= demand { + remaining = remaining.saturating_sub(usage); + included_usage = included_usage.saturating_add(usage); + last_included_idx = Some(idx); + } else { + // Can't include this transaction without crowding out the bundle. + break; + } + } + + // If we included all transactions and still have room, resource is not congested. + let is_uncongested = last_included_idx == Some(sorted.len() - 1) && remaining >= demand; + + if is_uncongested { + return Ok(ResourceEstimate { + threshold_priority_fee: default_fee, + recommended_priority_fee: default_fee, + cumulative_usage: included_usage, + threshold_tx_count: sorted.len(), + total_transactions: sorted.len(), + }); + } + + let (supporting_count, threshold_fee, recommended_fee) = match last_included_idx { + Some(idx) => { + // At least one transaction fits alongside the bundle. + // The threshold is the fee of the last included transaction. + let threshold_fee = sorted[idx].priority_fee_per_gas; + + // For recommended fee, look at included transactions (those above threshold) + // and pick one at the specified percentile for a safety margin. + let included = &sorted[..=idx]; + let percentile = percentile.clamp(0.0, 1.0); + let recommended_fee = if included.len() <= 1 { + threshold_fee + } else { + // Pick from the higher end of included transactions for safety. + let pos = ((included.len() - 1) as f64 * (1.0 - percentile)).round() as usize; + included[pos.min(included.len() - 1)].priority_fee_per_gas + }; + + (idx + 1, threshold_fee, recommended_fee) + } + None => { + // No transactions fit - even the first transaction would crowd out + // the bundle. The bundle must beat the highest fee to be included. + // Report 0 supporting transactions since none were actually included. + let threshold_fee = sorted[0].priority_fee_per_gas; + (0, threshold_fee, threshold_fee) + } + }; + + Ok(ResourceEstimate { + threshold_priority_fee: threshold_fee, + recommended_priority_fee: recommended_fee, + cumulative_usage: included_usage, + threshold_tx_count: supporting_count, + total_transactions: sorted.len(), + }) +} + +/// Returns a function that extracts the relevant resource usage from a transaction. +fn usage_extractor(resource: ResourceKind) -> fn(&MeteredTransaction) -> u128 { + match resource { + ResourceKind::GasUsed => |tx: &MeteredTransaction| tx.gas_used as u128, + ResourceKind::ExecutionTime => |tx: &MeteredTransaction| tx.execution_time_us, + ResourceKind::StateRootTime => |tx: &MeteredTransaction| tx.state_root_time_us, + ResourceKind::DataAvailability => { + |tx: &MeteredTransaction| tx.data_availability_bytes as u128 + } + } +} + +/// Computes the minimum and maximum recommended fees across all flashblocks. +/// +/// Returns two `ResourceEstimates`: +/// - First: For each resource, the estimate with the lowest recommended fee (easiest inclusion). +/// - Second: For each resource, the estimate with the highest recommended fee (most competitive). +fn compute_min_max_estimates( + flashblocks: &[FlashblockResourceEstimates], +) -> (ResourceEstimates, ResourceEstimates) { + let mut min_estimates = ResourceEstimates::default(); + let mut max_estimates = ResourceEstimates::default(); + + for flashblock in flashblocks { + for (resource, estimate) in flashblock.estimates.iter() { + // Update min. + let current_min = min_estimates.get(resource); + if current_min.is_none() + || estimate.recommended_priority_fee < current_min.unwrap().recommended_priority_fee + { + min_estimates.set(resource, estimate.clone()); + } + + // Update max. + let current_max = max_estimates.get(resource); + if current_max.is_none() + || estimate.recommended_priority_fee > current_max.unwrap().recommended_priority_fee + { + max_estimates.set(resource, estimate.clone()); + } + } + } + + (min_estimates, max_estimates) +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::{B256, U256}; + use parking_lot::RwLock; + use std::sync::Arc; + + const DEFAULT_FEE: U256 = U256::from_limbs([1, 0, 0, 0]); // 1 wei + const DEFAULT_LIMITS: ResourceLimits = ResourceLimits { + gas_used: Some(25), + execution_time_us: Some(100), + state_root_time_us: None, + data_availability_bytes: Some(100), + }; + + fn tx(priority: u64, usage: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&priority.to_be_bytes()); + MeteredTransaction { + tx_hash: B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority), + gas_used: usage, + execution_time_us: usage as u128, + state_root_time_us: usage as u128, + data_availability_bytes: usage, + } + } + + #[test] + fn compute_estimate_congested_resource() { + // Limit: 30, Demand: 15 + // Transactions: priority=10 (10 gas), priority=5 (10 gas), priority=2 (10 gas) + // Walking from top (highest fee): + // - Include tx priority=10: remaining = 30-10 = 20 >= 15 ✓ + // - Include tx priority=5: remaining = 20-10 = 10 < 15 ✗ (stop) + // Threshold = 10 (the last included tx's fee) + let txs = vec![tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); // Only the first tx was included + assert_eq!(quote.threshold_tx_count, 1); + assert_eq!(quote.total_transactions, 3); + } + + #[test] + fn compute_estimate_uncongested_resource() { + // Limit: 100, Demand: 15 + // All transactions fit with room to spare → return default fee + let txs = vec![tx(10, 10), tx(5, 10), tx(2, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 100, // limit is much larger than total usage + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + assert_eq!(quote.cumulative_usage, 30); // All txs included + assert_eq!(quote.threshold_tx_count, 3); + } + + #[test] + fn compute_estimate_demand_exceeds_limit() { + // Demand > Limit → Error + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let result = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 50, // demand + 30, // limit + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ); + assert!(matches!( + result, + Err(EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 50, + limit: 30, + }) + )); + } + + #[test] + fn compute_estimate_exact_fit() { + // Limit: 30, Demand: 20 + // Transactions: priority=10 (10 gas), priority=5 (10 gas) + // After including tx priority=10: remaining = 20 >= 20 ✓ + // After including tx priority=5: remaining = 10 < 20 ✗ + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 10); + assert_eq!(quote.threshold_tx_count, 1); + } + + #[test] + fn compute_estimate_single_transaction() { + // Single tx that fits + let txs = vec![tx(10, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // After including the tx: remaining = 20 >= 15 ✓ + // But we only have 1 tx, so it's uncongested + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + #[test] + fn compute_estimate_no_room_for_any_tx() { + // Limit: 25, Demand: 20 + // First tx uses 10, remaining = 15 < 20 → can't even include first tx + let txs = vec![tx(10, 10), tx(5, 10)]; + let txs_refs: Vec<&MeteredTransaction> = txs.iter().collect(); + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 20, + 25, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + // Need to beat the highest fee since we couldn't include any tx + assert_eq!(quote.threshold_priority_fee, U256::from(10)); + assert_eq!(quote.cumulative_usage, 0); + assert_eq!(quote.threshold_tx_count, 0); // No transactions can coexist + } + + #[test] + fn compute_estimate_empty_transactions() { + // No transactions = uncongested, return default fee + let txs_refs: Vec<&MeteredTransaction> = vec![]; + let quote = compute_estimate( + ResourceKind::GasUsed, + &txs_refs, + 15, + 30, + usage_extractor(ResourceKind::GasUsed), + 0.5, + DEFAULT_FEE, + ) + .expect("no error"); + assert_eq!(quote.threshold_priority_fee, DEFAULT_FEE); + assert_eq!(quote.recommended_priority_fee, DEFAULT_FEE); + } + + fn setup_estimator( + limits: ResourceLimits, + ) -> (Arc>, PriorityFeeEstimator) { + let cache = Arc::new(RwLock::new(MeteringCache::new(4))); + let estimator = PriorityFeeEstimator::new(cache.clone(), 0.5, limits, DEFAULT_FEE, None); + (cache, estimator) + } + + #[test] + fn estimate_for_block_respects_limits() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + guard.upsert_transaction(1, 0, tx(10, 10)); + guard.upsert_transaction(1, 0, tx(5, 10)); + } + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let estimates = estimator + .estimate_for_block(Some(1), demand) + .expect("no error") + .expect("cached block"); + + assert_eq!(estimates.block_number, 1); + let gas_estimate = estimates + .max_across_flashblocks + .gas_used + .expect("gas estimate present"); + assert_eq!(gas_estimate.threshold_priority_fee, U256::from(10)); + } + + #[test] + fn estimate_for_block_propagates_limit_errors() { + let mut limits = DEFAULT_LIMITS; + limits.gas_used = Some(10); + let (cache, estimator) = setup_estimator(limits); + { + let mut guard = cache.write(); + guard.upsert_transaction(1, 0, tx(10, 10)); + guard.upsert_transaction(1, 0, tx(5, 10)); + } + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let err = estimator + .estimate_for_block(Some(1), demand) + .expect_err("demand should exceed capacity"); + assert!(matches!( + err, + EstimateError::DemandExceedsCapacity { + resource: ResourceKind::GasUsed, + demand: 15, + limit: 10 + } + )); + } + + #[test] + fn estimate_rolling_aggregates_across_blocks() { + let (cache, estimator) = setup_estimator(DEFAULT_LIMITS); + { + let mut guard = cache.write(); + // Block 1 → threshold 10 + guard.upsert_transaction(1, 0, tx(10, 10)); + guard.upsert_transaction(1, 0, tx(5, 10)); + // Block 2 → threshold 30 + guard.upsert_transaction(2, 0, tx(30, 10)); + guard.upsert_transaction(2, 0, tx(25, 10)); + } + + let mut demand = ResourceDemand::default(); + demand.gas_used = Some(15); + + let rolling = estimator + .estimate_rolling(demand) + .expect("no error") + .expect("estimates available"); + + assert_eq!(rolling.blocks_sampled, 2); + let gas_estimate = rolling.estimates.gas_used.expect("gas estimate present"); + // Median across [10, 30] = 30 (upper median for even count) + assert_eq!(gas_estimate.recommended_priority_fee, U256::from(30)); + assert_eq!(rolling.recommended_priority_fee, U256::from(30)); + } +} diff --git a/crates/metering/src/kafka.rs b/crates/metering/src/kafka.rs new file mode 100644 index 00000000..82d6a78a --- /dev/null +++ b/crates/metering/src/kafka.rs @@ -0,0 +1,185 @@ +use crate::MeteredTransaction; +use alloy_consensus::Transaction; +use alloy_consensus::transaction::Recovered; +use alloy_eips::Encodable2718; +use alloy_primitives::U256; +use chrono::Utc; +use eyre::Result; +use op_alloy_consensus::OpTxEnvelope; +use op_alloy_flz::tx_estimated_size_fjord_bytes; +use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer}; +use rdkafka::{ClientConfig, Message}; +use std::time::Duration; +use tips_core::types::AcceptedBundle; +use tokio::sync::mpsc::UnboundedSender; +use tokio::time::sleep; +use tracing::{debug, error, info, trace, warn}; + +/// Configuration required to connect to the Kafka topic publishing accepted bundles. +pub struct KafkaBundleConsumerConfig { + pub client_config: ClientConfig, + pub topic: String, +} + +/// Maximum backoff delay for Kafka receive errors. +const MAX_BACKOFF_SECS: u64 = 60; + +/// Consumes `AcceptedBundle` events from Kafka and publishes transaction-level metering data. +pub struct KafkaBundleConsumer { + consumer: StreamConsumer, + tx_sender: UnboundedSender, + topic: String, +} + +impl KafkaBundleConsumer { + pub fn new( + config: KafkaBundleConsumerConfig, + tx_sender: UnboundedSender, + ) -> Result { + let KafkaBundleConsumerConfig { + client_config, + topic, + } = config; + + let consumer: StreamConsumer = client_config.create()?; + consumer.subscribe(&[topic.as_str()])?; + + Ok(Self { + consumer, + tx_sender, + topic, + }) + } + + /// Starts listening for Kafka messages until the task is cancelled. + pub async fn run(self) { + info!( + target: "metering::kafka", + topic = %self.topic, + "Starting Kafka bundle consumer" + ); + + let mut backoff_secs = 1u64; + + loop { + match self.consumer.recv().await { + Ok(message) => { + // Reset backoff on successful receive. + backoff_secs = 1; + if let Err(err) = self.handle_message(message).await { + error!(target: "metering::kafka", error = %err, "Failed to process Kafka message"); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + } + Err(err) => { + error!( + target: "metering::kafka", + error = %err, + backoff_secs, + "Kafka receive error for topic {}. Retrying after backoff", + self.topic + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + sleep(Duration::from_secs(backoff_secs)).await; + backoff_secs = (backoff_secs * 2).min(MAX_BACKOFF_SECS); + } + } + } + } + + async fn handle_message(&self, message: rdkafka::message::BorrowedMessage<'_>) -> Result<()> { + let payload = message + .payload() + .ok_or_else(|| eyre::eyre!("Kafka message missing payload"))?; + + let bundle: AcceptedBundle = serde_json::from_slice(payload)?; + metrics::counter!("metering.kafka.messages_total").increment(1); + + if let Some(ts) = message.timestamp().to_millis() { + let now_ms = Utc::now().timestamp_millis(); + let lag_ms = now_ms.saturating_sub(ts); + metrics::gauge!("metering.kafka.lag_ms").set(lag_ms as f64); + } + + debug!( + target: "metering::kafka", + block_number = bundle.block_number, + uuid = %bundle.uuid(), + tx_count = bundle.txs.len(), + "Received accepted bundle from Kafka" + ); + + self.publish_transactions(&bundle)?; + + // Best-effort asynchronous commit. + if let Err(err) = self.consumer.commit_message(&message, CommitMode::Async) { + warn!( + target: "metering::kafka", + error = %err, + "Failed to commit Kafka offset asynchronously" + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + + Ok(()) + } + + fn publish_transactions(&self, bundle: &AcceptedBundle) -> Result<()> { + if bundle.txs.len() != bundle.meter_bundle_response.results.len() { + warn!( + target: "metering::kafka", + bundle_uuid = %bundle.uuid(), + tx_count = bundle.txs.len(), + result_count = bundle.meter_bundle_response.results.len(), + "Bundle transactions/results length mismatch; skipping" + ); + metrics::counter!("metering.kafka.messages_skipped").increment(1); + return Ok(()); + } + + for (tx, result) in bundle + .txs + .iter() + .zip(bundle.meter_bundle_response.results.iter()) + { + let priority_fee_per_gas = calculate_priority_fee(tx); + let data_availability_bytes = tx_estimated_size_fjord_bytes(&tx.encoded_2718()); + + // TODO(metering): Populate state_root_time_us once the TIPS Kafka schema + // includes per-transaction state-root timing. + let metered_tx = MeteredTransaction { + tx_hash: tx.tx_hash(), + priority_fee_per_gas, + gas_used: result.gas_used, + execution_time_us: result.execution_time_us, + state_root_time_us: 0, + data_availability_bytes, + }; + + if let Err(err) = self.tx_sender.send(metered_tx) { + warn!( + target: "metering::kafka", + error = %err, + tx_hash = %tx.tx_hash(), + "Failed to send metered transaction event" + ); + metrics::counter!("metering.kafka.errors_total").increment(1); + } + } + + trace!( + target: "metering::kafka", + bundle_uuid = %bundle.uuid(), + transactions = bundle.txs.len(), + "Published metering events for bundle" + ); + + Ok(()) + } +} + +fn calculate_priority_fee(tx: &Recovered) -> U256 { + tx.max_priority_fee_per_gas() + .map(U256::from) + .unwrap_or_else(|| U256::from(tx.max_fee_per_gas())) +} diff --git a/crates/metering/src/lib.rs b/crates/metering/src/lib.rs index 138a3c7d..9739b209 100644 --- a/crates/metering/src/lib.rs +++ b/crates/metering/src/lib.rs @@ -1,8 +1,25 @@ +mod annotator; +mod cache; +mod estimator; +mod kafka; mod meter; mod rpc; #[cfg(test)] mod tests; +pub use annotator::{FlashblockInclusion, ResourceAnnotator}; +pub use cache::{ + BlockMetrics, FlashblockMetrics, MeteredTransaction, MeteringCache, ResourceTotals, +}; +pub use estimator::{ + BlockPriorityEstimates, EstimateError, FlashblockResourceEstimates, PriorityFeeEstimator, + ResourceDemand, ResourceEstimate, ResourceEstimates, ResourceKind, ResourceLimits, + RollingPriorityEstimates, +}; +pub use kafka::{KafkaBundleConsumer, KafkaBundleConsumerConfig}; pub use meter::meter_bundle; -pub use rpc::{MeteringApiImpl, MeteringApiServer}; +pub use reth_optimism_payload_builder::config::OpDAConfig; +pub use rpc::{ + MeteredPriorityFeeResponse, MeteringApiImpl, MeteringApiServer, ResourceFeeEstimateResponse, +}; pub use tips_core::types::{Bundle, MeterBundleResponse, TransactionResult}; diff --git a/crates/metering/src/rpc.rs b/crates/metering/src/rpc.rs index 98059fd3..e1563275 100644 --- a/crates/metering/src/rpc.rs +++ b/crates/metering/src/rpc.rs @@ -1,29 +1,70 @@ use alloy_consensus::Header; -use alloy_eips::BlockNumberOrTag; +use alloy_eips::{BlockNumberOrTag, Encodable2718}; use alloy_primitives::U256; use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, + types::{ErrorCode, ErrorObjectOwned}, }; +use op_alloy_flz::tx_estimated_size_fjord_bytes; use reth::providers::BlockReaderIdExt; use reth_optimism_chainspec::OpChainSpec; use reth_provider::{ChainSpecProvider, StateProviderFactory}; +use std::sync::Arc; use tips_core::types::{Bundle, MeterBundleResponse, ParsedBundle}; -use tracing::{error, info}; +use tracing::{debug, error, info}; -use crate::meter_bundle; +use crate::{ + PriorityFeeEstimator, ResourceDemand, ResourceEstimate, ResourceEstimates, ResourceKind, + RollingPriorityEstimates, meter_bundle, +}; + +/// Human-friendly representation of a resource fee quote. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct ResourceFeeEstimateResponse { + pub resource: String, + pub threshold_priority_fee: String, + pub recommended_priority_fee: String, + pub cumulative_usage: String, + pub threshold_tx_count: u64, + pub total_transactions: u64, +} + +/// Response payload for `base_meteredPriorityFeePerGas`. +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct MeteredPriorityFeeResponse { + #[serde(flatten)] + pub meter_bundle: MeterBundleResponse, + /// Single recommended priority fee (max across all resources and median across recent blocks). + pub recommended_priority_fee: String, + /// Number of recent blocks used to compute the rolling estimate. + pub blocks_sampled: u64, + /// Per-resource estimates (median across sampled blocks). + pub resource_estimates: Vec, +} -/// RPC API for transaction metering +/// RPC API for transaction metering. #[rpc(server, namespace = "base")] pub trait MeteringApi { - /// Simulates and meters a bundle of transactions + /// Simulates and meters a bundle of transactions. #[method(name = "meterBundle")] async fn meter_bundle(&self, bundle: Bundle) -> RpcResult; + + /// Estimates the priority fee necessary for a bundle to be included in recently observed + /// flashblocks, considering multiple resource constraints. + #[method(name = "meteredPriorityFeePerGas")] + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult; } -/// Implementation of the metering RPC API +/// Implementation of the metering RPC API. pub struct MeteringApiImpl { provider: Provider, + priority_fee_estimator: Arc, } impl MeteringApiImpl @@ -33,88 +74,80 @@ where + BlockReaderIdExt
+ Clone, { - /// Creates a new instance of MeteringApi - pub fn new(provider: Provider) -> Self { - Self { provider } + /// Creates a new instance of the metering API backed by the given provider and estimator. + pub fn new(provider: Provider, priority_fee_estimator: Arc) -> Self { + Self { + provider, + priority_fee_estimator, + } } -} -#[async_trait] -impl MeteringApiServer for MeteringApiImpl -where - Provider: StateProviderFactory - + ChainSpecProvider - + BlockReaderIdExt
- + Clone - + Send - + Sync - + 'static, -{ - async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + fn run_metering( + &self, + bundle: Bundle, + ) -> Result<(MeterBundleResponse, ResourceDemand), ErrorObjectOwned> { info!( num_transactions = &bundle.txs.len(), block_number = &bundle.block_number, "Starting bundle metering" ); - // Get the latest header let header = self .provider .sealed_header_by_number_or_tag(BlockNumberOrTag::Latest) .map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get latest header: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get latest header: {e}"), None::<()>, ) })? .ok_or_else(|| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), "Latest block not found".to_string(), None::<()>, ) })?; let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InvalidParams.code(), - format!("Failed to parse bundle: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InvalidParams.code(), + format!("Failed to parse bundle: {e}"), None::<()>, ) })?; - // Get state provider for the block + let da_usage: u64 = parsed_bundle + .txs + .iter() + .map(|tx| tx_estimated_size_fjord_bytes(&tx.encoded_2718())) + .sum(); + let state_provider = self .provider .state_by_block_hash(header.hash()) .map_err(|e| { error!(error = %e, "Failed to get state provider"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get state provider: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Failed to get state provider: {e}"), None::<()>, ) })?; - // Meter bundle using utility function + let chain_spec = self.provider.chain_spec().clone(); + let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle( - state_provider, - self.provider.chain_spec().clone(), - parsed_bundle, - &header, - ) - .map_err(|e| { + meter_bundle(state_provider, chain_spec, parsed_bundle, &header).map_err(|e| { error!(error = %e, "Bundle metering failed"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Bundle metering failed: {}", e), + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + format!("Bundle metering failed: {e}"), None::<()>, ) })?; - // Calculate average gas price let bundle_gas_price = if total_gas_used > 0 { (total_gas_fees / U256::from(total_gas_used)).to_string() } else { @@ -129,7 +162,7 @@ where "Bundle metering completed successfully" ); - Ok(MeterBundleResponse { + let response = MeterBundleResponse { bundle_gas_price, bundle_hash, coinbase_diff: total_gas_fees.to_string(), @@ -140,6 +173,101 @@ where state_flashblock_index: None, total_gas_used, total_execution_time_us: total_execution_time, - }) + }; + + let resource_demand = ResourceDemand { + gas_used: Some(total_gas_used), + execution_time_us: Some(total_execution_time), + state_root_time_us: None, // Populated when state-root metrics become available. + data_availability_bytes: Some(da_usage), + }; + + Ok((response, resource_demand)) + } + + fn build_priority_fee_response( + meter_bundle: MeterBundleResponse, + estimates: RollingPriorityEstimates, + ) -> MeteredPriorityFeeResponse { + let resource_estimates = resource_estimates_to_vec(&estimates.estimates); + + MeteredPriorityFeeResponse { + meter_bundle, + recommended_priority_fee: estimates.recommended_priority_fee.to_string(), + blocks_sampled: estimates.blocks_sampled as u64, + resource_estimates, + } + } +} + +#[async_trait] +impl MeteringApiServer for MeteringApiImpl +where + Provider: StateProviderFactory + + ChainSpecProvider + + BlockReaderIdExt
+ + Clone + + Send + + Sync + + 'static, +{ + async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { + let (response, _) = self.run_metering(bundle)?; + Ok(response) + } + + async fn metered_priority_fee_per_gas( + &self, + bundle: Bundle, + ) -> RpcResult { + let (meter_bundle, resource_demand) = self.run_metering(bundle)?; + + debug!(?resource_demand, "Computing priority fee estimates"); + + let estimates = self + .priority_fee_estimator + .estimate_rolling(resource_demand) + .map_err(|e| { + ErrorObjectOwned::owned(ErrorCode::InvalidParams.code(), e.to_string(), None::<()>) + })? + .ok_or_else(|| { + ErrorObjectOwned::owned( + ErrorCode::InternalError.code(), + "Priority fee data unavailable".to_string(), + None::<()>, + ) + })?; + + let response = Self::build_priority_fee_response(meter_bundle, estimates); + Ok(response) + } +} + +fn resource_estimates_to_vec(estimates: &ResourceEstimates) -> Vec { + estimates + .iter() + .map(|(kind, est)| to_response(kind, est)) + .collect() +} + +fn to_response(resource: ResourceKind, estimate: &ResourceEstimate) -> ResourceFeeEstimateResponse { + ResourceFeeEstimateResponse { + resource: resource.as_camel_case().to_string(), + threshold_priority_fee: estimate.threshold_priority_fee.to_string(), + recommended_priority_fee: estimate.recommended_priority_fee.to_string(), + cumulative_usage: estimate.cumulative_usage.to_string(), + threshold_tx_count: estimate.threshold_tx_count.try_into().unwrap_or(u64::MAX), + total_transactions: estimate.total_transactions.try_into().unwrap_or(u64::MAX), + } +} + +impl ResourceKind { + fn as_camel_case(&self) -> &'static str { + match self { + ResourceKind::GasUsed => "gasUsed", + ResourceKind::ExecutionTime => "executionTime", + ResourceKind::StateRootTime => "stateRootTime", + ResourceKind::DataAvailability => "dataAvailability", + } } } diff --git a/crates/metering/src/tests/annotator.rs b/crates/metering/src/tests/annotator.rs new file mode 100644 index 00000000..19429245 --- /dev/null +++ b/crates/metering/src/tests/annotator.rs @@ -0,0 +1,128 @@ +#![allow(clippy::unwrap_used)] + +use super::utils::init_tracing; +use crate::{FlashblockInclusion, MeteredTransaction, MeteringCache, ResourceAnnotator}; +use alloy_primitives::{B256, U256}; +use parking_lot::RwLock; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::Duration; + +fn test_transaction(tx_hash: B256, priority: u64) -> MeteredTransaction { + MeteredTransaction { + tx_hash, + priority_fee_per_gas: U256::from(priority), + gas_used: 21_000, + execution_time_us: 500, + state_root_time_us: 0, + data_availability_bytes: 120, + } +} + +#[tokio::test] +async fn annotator_updates_cache_on_flashblock() { + init_tracing(); + + let cache = Arc::new(RwLock::new(MeteringCache::new(12))); + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (flash_sender, flash_receiver) = mpsc::unbounded_channel::(); + + let annotator = ResourceAnnotator::new(cache.clone(), tx_receiver, flash_receiver); + + let handle = tokio::spawn(annotator.run()); + + // Send a metered transaction (goes to pending map). + let tx_hash = B256::random(); + let transaction = test_transaction(tx_hash, 5); + tx_sender.send(transaction.clone()).unwrap(); + + // Give time for the event to be processed. + tokio::time::sleep(Duration::from_millis(25)).await; + + // Cache should still be empty - tx is in pending, not cache. + assert!(cache.read().is_empty()); + + // Now send the flashblock inclusion event with the actual location. + flash_sender + .send(FlashblockInclusion { + block_number: 1, + flashblock_index: 0, + ordered_tx_hashes: vec![tx_hash], + }) + .unwrap(); + + // Give time for the flashblock event to be processed. + tokio::time::sleep(Duration::from_millis(25)).await; + + // Now the cache should have the transaction at the correct location. + let cache_read = cache.read(); + let block = cache_read.block(1).expect("block inserted"); + assert_eq!(block.flashblock_count(), 1); + let flashblock = block.flashblock(0).expect("flashblock present"); + assert_eq!(flashblock.len(), 1); + assert_eq!( + flashblock.transactions().next().unwrap().tx_hash, + transaction.tx_hash + ); + + drop(cache_read); + drop(tx_sender); + drop(flash_sender); + + handle.await.unwrap(); +} + +#[tokio::test] +async fn annotator_ignores_unknown_tx_hashes() { + init_tracing(); + + let cache = Arc::new(RwLock::new(MeteringCache::new(12))); + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (flash_sender, flash_receiver) = mpsc::unbounded_channel::(); + + let annotator = ResourceAnnotator::new(cache.clone(), tx_receiver, flash_receiver); + let handle = tokio::spawn(annotator.run()); + + // Send a metered transaction. + let tx_hash = B256::random(); + tx_sender.send(test_transaction(tx_hash, 8)).unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + // Send flashblock inclusion with a *different* tx hash (not in pending). + let unknown_hash = B256::random(); + flash_sender + .send(FlashblockInclusion { + block_number: 10, + flashblock_index: 2, + ordered_tx_hashes: vec![unknown_hash], + }) + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + // Cache should be empty since the tx hash didn't match. + assert!(cache.read().is_empty()); + + // The original tx is still in pending (not matched yet). + // Send a flashblock that matches it. + flash_sender + .send(FlashblockInclusion { + block_number: 10, + flashblock_index: 3, + ordered_tx_hashes: vec![tx_hash], + }) + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + + // Now it should be in the cache. + let cache_read = cache.read(); + let block = cache_read.block(10).expect("block inserted"); + assert_eq!(block.flashblock(3).unwrap().len(), 1); + + drop(cache_read); + drop(tx_sender); + drop(flash_sender); + handle.await.unwrap(); +} diff --git a/crates/metering/src/tests/mod.rs b/crates/metering/src/tests/mod.rs index 80d28813..796ddaad 100644 --- a/crates/metering/src/tests/mod.rs +++ b/crates/metering/src/tests/mod.rs @@ -1,4 +1,6 @@ #[cfg(test)] +mod annotator; +#[cfg(test)] mod meter; #[cfg(test)] mod rpc; diff --git a/crates/metering/src/tests/rpc.rs b/crates/metering/src/tests/rpc.rs index 2f5778c8..4f059a13 100644 --- a/crates/metering/src/tests/rpc.rs +++ b/crates/metering/src/tests/rpc.rs @@ -1,20 +1,32 @@ #[cfg(test)] mod tests { - use crate::rpc::{MeteringApiImpl, MeteringApiServer}; + use crate::{ + MeteredTransaction, MeteringApiImpl, MeteringApiServer, MeteringCache, + PriorityFeeEstimator, ResourceLimits, + }; + + const PRIORITY_FEE_PERCENTILE: f64 = 0.5; + const GAS_LIMIT: u64 = 30_000_000; + const EXECUTION_TIME_US: u128 = 50_000; + const DA_BYTES: u64 = 120_000; + const UNCONGESTED_PRIORITY_FEE: u64 = 1; use alloy_eips::Encodable2718; use alloy_genesis::Genesis; use alloy_primitives::bytes; use alloy_primitives::{Bytes, U256, address, b256}; use alloy_rpc_client::RpcClient; use op_alloy_consensus::OpTxEnvelope; + use parking_lot::RwLock; use reth::args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}; use reth::builder::{Node, NodeBuilder, NodeConfig, NodeHandle}; use reth::chainspec::Chain; use reth::core::exit::NodeExitFuture; use reth::tasks::TaskManager; + use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection}; use reth_optimism_chainspec::OpChainSpecBuilder; use reth_optimism_node::OpNode; use reth_optimism_node::args::RollupArgs; + use reth_optimism_payload_builder::config::OpDAConfig; use reth_optimism_primitives::OpTransactionSigned; use reth_provider::providers::BlockchainProvider; use reth_transaction_pool::test_utils::TransactionBuilder; @@ -26,6 +38,7 @@ mod tests { pub struct NodeContext { http_api_addr: SocketAddr, + cache: Arc>, _node_exit_future: NodeExitFuture, _node: Box, } @@ -77,10 +90,35 @@ mod tests { let node_config = NodeConfig::new(chain_spec.clone()) .with_network(network_config.clone()) - .with_rpc(RpcServerArgs::default().with_unused_ports().with_http()) + .with_rpc( + RpcServerArgs::default() + .with_unused_ports() + .with_http() + .with_http_api(RpcModuleSelection::from([RethRpcModule::Miner])), + ) .with_unused_ports(); - let node = OpNode::new(RollupArgs::default()); + // Create shared DA config that will be used by both the miner RPC and the estimator. + // When miner_setMaxDASize is called, the OpDAConfig is updated atomically and + // the estimator will see the new limits. + let da_config = OpDAConfig::default(); + let node = OpNode::new(RollupArgs::default()).with_da_config(da_config.clone()); + + let cache = Arc::new(RwLock::new(MeteringCache::new(12))); + let limits = ResourceLimits { + gas_used: Some(GAS_LIMIT), + execution_time_us: Some(EXECUTION_TIME_US), + state_root_time_us: None, + data_availability_bytes: Some(DA_BYTES), + }; + let estimator = Arc::new(PriorityFeeEstimator::new( + cache.clone(), + PRIORITY_FEE_PERCENTILE, + limits, + U256::from(UNCONGESTED_PRIORITY_FEE), + Some(da_config.clone()), + )); + let estimator_for_rpc = estimator.clone(); let NodeHandle { node, @@ -91,7 +129,8 @@ mod tests { .with_components(node.components_builder()) .with_add_ons(node.add_ons()) .extend_rpc_modules(move |ctx| { - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); + let metering_api = + MeteringApiImpl::new(ctx.provider().clone(), estimator_for_rpc.clone()); ctx.modules.merge_configured(metering_api.into_rpc())?; Ok(()) }) @@ -105,6 +144,7 @@ mod tests { Ok(NodeContext { http_api_addr, + cache, _node_exit_future: node_exit_future, _node: Box::new(node), }) @@ -439,4 +479,131 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_metered_priority_fee_requires_history() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let client = node.rpc_client().await?; + + let bundle = create_bundle(vec![], 0, None); + + let response: Result = client + .request("base_meteredPriorityFeePerGas", (bundle,)) + .await; + + assert!(response.is_err()); + let err = response.unwrap_err(); + let err_str = format!("{err}"); + assert!( + err_str.contains("Priority fee data unavailable"), + "unexpected error: {err_str}" + ); + + Ok(()) + } + + /// Creates a test transaction with specified priority fee and DA bytes. + fn test_tx(priority_fee: u64, da_bytes: u64) -> MeteredTransaction { + let mut hash_bytes = [0u8; 32]; + hash_bytes[24..].copy_from_slice(&priority_fee.to_be_bytes()); + MeteredTransaction { + tx_hash: alloy_primitives::B256::new(hash_bytes), + priority_fee_per_gas: U256::from(priority_fee), + gas_used: 21_000, + execution_time_us: 100, + state_root_time_us: 0, + data_availability_bytes: da_bytes, + } + } + + #[tokio::test] + async fn test_set_max_da_size_updates_priority_fee_estimates() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let client = node.rpc_client().await?; + + // Create a transaction to include in the bundle for DA demand calculation. + // Use a funded account from genesis.json (Hardhat account #0). + let sender_secret = + b256!("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"); + + let tx = TransactionBuilder::default() + .signer(sender_secret) + .chain_id(84532) + .nonce(0) + .to(address!("0x1111111111111111111111111111111111111111")) + .value(1000) + .gas_limit(21_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000_000) + .into_eip1559(); + + let signed_tx = + OpTransactionSigned::Eip1559(tx.as_eip1559().expect("eip1559 transaction").clone()); + let envelope: OpTxEnvelope = signed_tx.into(); + let tx_bytes = Bytes::from(envelope.encoded_2718()); + + // Populate the cache with test transactions that have known DA bytes. + // We'll create a scenario where DA is the constraining resource: + // - tx1: priority=100, DA=50 bytes + // - tx2: priority=50, DA=50 bytes + // - tx3: priority=10, DA=50 bytes + // Total DA used by existing transactions = 150 bytes + { + let mut cache = node.cache.write(); + cache.upsert_transaction(1, 0, test_tx(100, 50)); + cache.upsert_transaction(1, 0, test_tx(50, 50)); + cache.upsert_transaction(1, 0, test_tx(10, 50)); + } + + // Bundle with our transaction - it will have some DA demand from the tx bytes. + let bundle = create_bundle(vec![tx_bytes], 0, None); + + // With default DA limit (120KB = 120_000 bytes), there's plenty of room. + // All transactions fit (150 bytes used) plus our bundle's demand. + // This should return the uncongested default fee (1 wei). + // + // Note: We use serde_json::Value because alloy_rpc_client can't deserialize u128 fields + // when they're nested in flattened structs. The response contains totalExecutionTimeUs + // as u128 which causes deserialization issues. + let response: serde_json::Value = client + .request("base_meteredPriorityFeePerGas", (bundle.clone(),)) + .await?; + + let fee_before = response["recommendedPriorityFee"] + .as_str() + .expect("recommendedPriorityFee should be a string"); + assert_eq!( + fee_before, "1", + "with large DA limit, resource should be uncongested" + ); + + // Now reduce the DA limit to 200 bytes via miner_setMaxDASize. + // With 200 byte limit and ~100 byte bundle demand, only ~100 bytes available for others. + // tx1 (50) + tx2 (50) = 100 bytes fits, but adding tx3 (50) = 150 bytes exceeds capacity. + // So tx3 gets displaced. Threshold fee = tx2's fee = 50. + let result: bool = client.request("miner_setMaxDASize", (1000, 200)).await?; + assert!(result, "miner_setMaxDASize should succeed"); + + // Request priority fee again - now DA should be congested. + let response: serde_json::Value = client + .request("base_meteredPriorityFeePerGas", (bundle,)) + .await?; + + // With the reduced limit, we should see a higher recommended fee. + // The exact value depends on the percentile calculation, but it should + // be significantly higher than the uncongested fee of 1. + let fee_after = response["recommendedPriorityFee"] + .as_str() + .expect("recommendedPriorityFee should be a string"); + let fee: u64 = fee_after.parse().expect("valid u64"); + assert!( + fee > 1, + "with reduced DA limit, recommended fee should be higher than uncongested fee (1), got {}", + fee + ); + + Ok(()) + } } diff --git a/crates/metering/src/tests/utils.rs b/crates/metering/src/tests/utils.rs index 7bd29fef..da46f5fe 100644 --- a/crates/metering/src/tests/utils.rs +++ b/crates/metering/src/tests/utils.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Once}; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ @@ -20,6 +20,14 @@ pub fn create_provider_factory( ) } +static TRACING_INIT: Once = Once::new(); + +pub fn init_tracing() { + TRACING_INIT.call_once(|| { + let _ = tracing_subscriber::fmt::try_init(); + }); +} + fn create_test_db() -> Arc> { let path = tempdir_path(); let emsg = format!("{ERROR_DB_CREATION}: {path:?}"); diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9e3a2adf..5b711c84 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -28,6 +28,7 @@ reth-rpc-convert.workspace = true reth-optimism-rpc.workspace = true reth-optimism-evm.workspace = true reth-optimism-chainspec.workspace = true +reth-rpc-server-types.workspace = true reth-cli-util.workspace = true # revm @@ -86,6 +87,9 @@ uuid.workspace = true time.workspace = true chrono.workspace = true once_cell.workspace = true +parking_lot.workspace = true +rdkafka.workspace = true +tips-core.workspace = true [features] default = [] diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index b8dbb3ce..05c6478a 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,27 +1,59 @@ -use base_reth_flashblocks_rpc::rpc::EthApiExt; -use futures_util::TryStreamExt; -use once_cell::sync::OnceCell; -use reth::version::{ - RethCliVersionConsts, default_reth_version_metadata, try_init_version_metadata, +use alloy_primitives::{TxHash, keccak256}; +use base_reth_flashblocks_rpc::rpc::{EthApiExt, EthApiOverrideServer}; +use base_reth_flashblocks_rpc::state::FlashblocksState; +use base_reth_flashblocks_rpc::subscription::{Flashblock, FlashblocksSubscriber}; +use base_reth_metering::{ + FlashblockInclusion, KafkaBundleConsumer, KafkaBundleConsumerConfig, MeteredTransaction, + MeteringApiImpl, MeteringApiServer, MeteringCache, OpDAConfig, PriorityFeeEstimator, + ResourceAnnotator, ResourceLimits, }; -use reth_exex::ExExEvent; -use std::sync::Arc; -use base_reth_flashblocks_rpc::rpc::EthApiOverrideServer; -use base_reth_flashblocks_rpc::state::FlashblocksState; -use base_reth_flashblocks_rpc::subscription::FlashblocksSubscriber; -use base_reth_metering::{MeteringApiImpl, MeteringApiServer}; +/// Default percentile applied when selecting a recommended priority fee among +/// transactions already scheduled above the inclusion threshold. +const DEFAULT_PRIORITY_FEE_PERCENTILE: f64 = 0.5; + +/// Default gas limit per flashblock (30M gas). +const DEFAULT_GAS_LIMIT: u64 = 30_000_000; + +/// Default execution time budget per block in microseconds (50ms). +const DEFAULT_EXECUTION_TIME_US: u128 = 50_000; + +/// Default data availability limit per flashblock in bytes (120KB). +const DEFAULT_DA_BYTES: u64 = 120_000; + +/// Default state root time budget per block in microseconds (disabled by default). +const DEFAULT_STATE_ROOT_TIME_US: Option = None; + +/// Default priority fee returned when a resource is not congested (1 wei). +const DEFAULT_UNCONGESTED_PRIORITY_FEE: u64 = 1; + +/// Default number of recent blocks retained in the metering cache. +const DEFAULT_METERING_CACHE_SIZE: usize = 12; + use base_reth_transaction_tracing::transaction_tracing_exex; use clap::Parser; +use eyre::bail; +use futures_util::TryStreamExt; +use once_cell::sync::OnceCell; +use parking_lot::RwLock; +use rdkafka::config::ClientConfig; use reth::builder::{Node, NodeHandle}; +use reth::version::{ + RethCliVersionConsts, default_reth_version_metadata, try_init_version_metadata, +}; use reth::{ builder::{EngineNodeLauncher, TreeConfig}, providers::providers::BlockchainProvider, }; +use reth_exex::ExExEvent; use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; use reth_optimism_node::OpNode; use reth_optimism_node::args::RollupArgs; -use tracing::info; +use reth_rpc_server_types::{RethRpcModule, RpcModuleSelection}; +use std::sync::Arc; +use tips_core::kafka::load_kafka_config_from_file; +use tokio::sync::mpsc; +use tracing::{error, info, warn}; use url::Url; pub const NODE_RETH_CLIENT_VERSION: &str = concat!("base/v", env!("CARGO_PKG_VERSION")); @@ -29,7 +61,7 @@ pub const NODE_RETH_CLIENT_VERSION: &str = concat!("base/v", env!("CARGO_PKG_VER #[global_allocator] static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); -#[derive(Debug, Clone, PartialEq, Eq, clap::Args)] +#[derive(Debug, Clone, PartialEq, clap::Args)] #[command(next_help_heading = "Rollup")] struct Args { #[command(flatten)] @@ -55,12 +87,161 @@ struct Args { /// Enable metering RPC for transaction bundle simulation #[arg(long = "enable-metering", value_name = "ENABLE_METERING")] pub enable_metering: bool, + + /// Kafka brokers (comma-separated) publishing accepted bundle events for metering. + #[arg(long = "metering-kafka-brokers")] + pub metering_kafka_brokers: Option, + + /// Kafka topic carrying accepted bundle events. + #[arg(long = "metering-kafka-topic")] + pub metering_kafka_topic: Option, + + /// Kafka consumer group id for metering ingestion. + #[arg(long = "metering-kafka-group-id")] + pub metering_kafka_group_id: Option, + + /// Optional Kafka properties file (key=value per line) merged into the consumer config. + #[arg(long = "metering-kafka-properties-file")] + pub metering_kafka_properties_file: Option, + + /// Gas limit per flashblock used for priority fee estimation. + #[arg(long = "metering-gas-limit", default_value_t = DEFAULT_GAS_LIMIT)] + pub metering_gas_limit: u64, + + /// Execution time budget per block in microseconds for priority fee estimation. + #[arg(long = "metering-execution-time-us", default_value_t = DEFAULT_EXECUTION_TIME_US)] + pub metering_execution_time_us: u128, + + /// State root time budget per block in microseconds for priority fee estimation. + /// Not used until per-transaction state-root timing is available in the TIPS Kafka schema. + #[arg(long = "metering-state-root-time-us")] + pub metering_state_root_time_us: Option, + + /// Data availability limit per flashblock in bytes for priority fee estimation. + #[arg(long = "metering-da-bytes", default_value_t = DEFAULT_DA_BYTES)] + pub metering_da_bytes: u64, + + /// Percentile (0.0-1.0) for selecting recommended priority fee among transactions + /// above the inclusion threshold. + #[arg(long = "metering-priority-fee-percentile", default_value_t = DEFAULT_PRIORITY_FEE_PERCENTILE)] + pub metering_priority_fee_percentile: f64, + + /// Priority fee (in wei) returned when a resource is not congested. + #[arg(long = "metering-uncongested-priority-fee", default_value_t = DEFAULT_UNCONGESTED_PRIORITY_FEE)] + pub metering_uncongested_priority_fee: u64, + + /// Number of recent blocks to retain in the metering cache (minimum 1). + #[arg(long = "metering-cache-size", default_value_t = DEFAULT_METERING_CACHE_SIZE)] + pub metering_cache_size: usize, } impl Args { fn flashblocks_enabled(&self) -> bool { self.websocket_url.is_some() } + + fn metering_kafka_settings(&self) -> Option { + if !self.enable_metering { + return None; + } + + let brokers = self.metering_kafka_brokers.clone()?; + let topic = self.metering_kafka_topic.clone()?; + let group_id = self.metering_kafka_group_id.clone()?; + + Some(MeteringKafkaSettings { + brokers, + topic, + group_id, + properties_file: self.metering_kafka_properties_file.clone(), + }) + } +} + +#[derive(Clone)] +struct MeteringKafkaSettings { + brokers: String, + topic: String, + group_id: String, + properties_file: Option, +} + +#[derive(Clone)] +struct MeteringRuntime { + cache: Arc>, + estimator: Arc, + tx_sender: mpsc::UnboundedSender, + flashblock_sender: mpsc::UnboundedSender, +} + +struct CompositeFlashblocksReceiver { + state: Arc>, + /// Optional channel for the metering pipeline; flashblocks RPC still needs the stream even + /// when metering is disabled, so we only forward inclusions if a sender is provided. + metering_sender: Option>, +} + +impl CompositeFlashblocksReceiver { + fn new( + state: Arc>, + metering_sender: Option>, + ) -> Self { + Self { + state, + metering_sender, + } + } +} + +impl base_reth_flashblocks_rpc::subscription::FlashblocksReceiver + for CompositeFlashblocksReceiver +where + FlashblocksState: base_reth_flashblocks_rpc::subscription::FlashblocksReceiver, +{ + fn on_flashblock_received(&self, flashblock: Flashblock) { + metrics::counter!("metering.flashblocks.received").increment(1); + metrics::gauge!("metering.flashblocks.index").set(flashblock.index as f64); + metrics::gauge!("metering.flashblocks.transactions") + .set(flashblock.diff.transactions.len() as f64); + metrics::gauge!("metering.flashblocks.block_number") + .set(flashblock.metadata.block_number as f64); + + self.state.on_flashblock_received(flashblock.clone()); + + let Some(sender) = &self.metering_sender else { + return; + }; + let Some(inclusion) = flashblock_inclusion_from_flashblock(&flashblock) else { + return; + }; + + if sender.send(inclusion).is_err() { + warn!( + target: "metering::flashblocks", + "Failed to forward flashblock inclusion to metering" + ); + } + } +} + +fn flashblock_inclusion_from_flashblock(flashblock: &Flashblock) -> Option { + if flashblock.diff.transactions.is_empty() { + metrics::counter!("metering.flashblocks.empty").increment(1); + return None; + } + + let ordered_tx_hashes: Vec = flashblock + .diff + .transactions + .iter() + .map(|tx_bytes| TxHash::from(keccak256(tx_bytes))) + .collect(); + + Some(FlashblockInclusion { + block_number: flashblock.metadata.block_number, + flashblock_index: flashblock.index, + ordered_tx_hashes, + }) } fn main() { @@ -88,15 +269,132 @@ fn main() { .expect("Unable to init version metadata"); Cli::::parse() - .run(|builder, args| async move { + .run(|mut builder, args| async move { info!(message = "starting custom Base node"); let flashblocks_enabled = args.flashblocks_enabled(); let transaction_tracing_enabled = args.enable_transaction_tracing; let metering_enabled = args.enable_metering; - let op_node = OpNode::new(args.rollup_args.clone()); + let kafka_settings = args.metering_kafka_settings(); + if metering_enabled && kafka_settings.is_none() { + bail!( + "Metering requires Kafka configuration: provide --metering-kafka-brokers, --metering-kafka-topic, and --metering-kafka-group-id" + ); + } + + // Create shared DA config for dynamic updates via miner_setMaxDASize RPC. + // Both the OpNode (miner RPC) and PriorityFeeEstimator share this config. + let da_config = if metering_enabled { + // Enable miner RPC module for miner_setMaxDASize when metering is enabled. + let updated_api = builder + .config() + .rpc + .http_api + .clone() + .unwrap_or_else(|| RpcModuleSelection::from([])) + .append(RethRpcModule::Miner); + builder.config_mut().rpc.http_api = Some(updated_api); + + Some(OpDAConfig::new(0, args.metering_da_bytes)) + } else { + None + }; + + let op_node = if let Some(ref da_config) = da_config { + OpNode::new(args.rollup_args.clone()).with_da_config(da_config.clone()) + } else { + OpNode::new(args.rollup_args.clone()) + }; + + let metering_runtime = if metering_enabled { + if args.metering_cache_size == 0 { + bail!("--metering-cache-size must be at least 1"); + } + let cache = Arc::new(RwLock::new(MeteringCache::new(args.metering_cache_size))); + let limits = ResourceLimits { + gas_used: Some(args.metering_gas_limit), + execution_time_us: Some(args.metering_execution_time_us), + state_root_time_us: args.metering_state_root_time_us, + data_availability_bytes: Some(args.metering_da_bytes), + }; + let estimator = Arc::new(PriorityFeeEstimator::new( + cache.clone(), + args.metering_priority_fee_percentile, + limits, + alloy_primitives::U256::from(args.metering_uncongested_priority_fee), + da_config.clone(), + )); + + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let (flashblock_sender, flashblock_receiver) = + mpsc::unbounded_channel::(); + + let annotator_cache = cache.clone(); + tokio::spawn(async move { + ResourceAnnotator::new(annotator_cache, tx_receiver, flashblock_receiver) + .run() + .await; + }); + + Some(MeteringRuntime { + cache, + estimator, + tx_sender, + flashblock_sender, + }) + } else { + None + }; + + let metering_runtime_for_flashblocks = metering_runtime.clone(); + + if let (Some(runtime), Some(settings)) = (metering_runtime.clone(), kafka_settings) { + let mut client_config = ClientConfig::new(); + client_config.set("bootstrap.servers", &settings.brokers); + client_config.set("group.id", &settings.group_id); + client_config.set("enable.partition.eof", "false"); + client_config.set("session.timeout.ms", "6000"); + client_config.set("enable.auto.commit", "true"); + client_config.set("auto.offset.reset", "earliest"); + + if let Some(path) = settings.properties_file.as_ref() { + match load_kafka_config_from_file(path) { + Ok(props) => { + for (key, value) in props { + client_config.set(key, value); + } + } + Err(err) => { + warn!( + message = "Failed to load Kafka properties file", + file = %path, + %err + ); + } + } + } + + let tx_sender = runtime.tx_sender.clone(); + let topic = settings.topic.clone(); + tokio::spawn(async move { + let config = KafkaBundleConsumerConfig { + client_config, + topic, + }; + + match KafkaBundleConsumer::new(config, tx_sender) { + Ok(consumer) => consumer.run().await, + Err(err) => error!( + target: "metering::kafka", + %err, + "Failed to initialize Kafka consumer" + ), + } + }); + } let fb_cell: Arc>>> = Arc::new(OnceCell::new()); + let metering_runtime_for_rpc = metering_runtime.clone(); let NodeHandle { node: _node, @@ -140,7 +438,11 @@ fn main() { .extend_rpc_modules(move |ctx| { if metering_enabled { info!(message = "Starting Metering RPC"); - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); + let estimator = metering_runtime_for_rpc + .as_ref() + .map(|runtime| runtime.estimator.clone()) + .expect("metering estimator should be initialized"); + let metering_api = MeteringApiImpl::new(ctx.provider().clone(), estimator); ctx.modules.merge_configured(metering_api.into_rpc())?; } @@ -158,7 +460,15 @@ fn main() { .clone(); fb.start(); - let mut flashblocks_client = FlashblocksSubscriber::new(fb.clone(), ws_url); + let metering_sender = metering_runtime_for_flashblocks + .as_ref() + .map(|runtime| runtime.flashblock_sender.clone()); + let receiver = Arc::new(CompositeFlashblocksReceiver::new( + fb.clone(), + metering_sender, + )); + + let mut flashblocks_client = FlashblocksSubscriber::new(receiver, ws_url); flashblocks_client.start(); let api_ext = EthApiExt::new(