22
33use std:: { io:: Read , sync:: Arc , time:: Duration } ;
44
5- use alloy_primitives:: { Address , B256 , U256 , map:: foldhash:: HashMap } ;
6- use alloy_rpc_types_engine:: PayloadId ;
75use futures_util:: { SinkExt as _, StreamExt } ;
8- use reth_optimism_primitives:: OpReceipt ;
9- use rollup_boost:: {
10- ExecutionPayloadBaseV1 , ExecutionPayloadFlashblockDeltaV1 , FlashblocksPayloadV1 ,
11- } ;
12- use serde:: { Deserialize , Serialize } ;
6+ use rollup_boost:: FlashblocksPayloadV1 ;
137use tokio:: { sync:: mpsc, time:: interval} ;
148use tokio_tungstenite:: { connect_async, tungstenite:: protocol:: Message } ;
159use tracing:: { error, info, trace, warn} ;
1610use url:: Url ;
1711
18- use crate :: Metrics ;
19-
20- /// Interval of liveness check of upstream, in milliseconds.
21- pub const PING_INTERVAL_MS : u64 = 500 ;
22-
23- /// Max duration of backoff before reconnecting to upstream.
24- pub const MAX_BACKOFF : Duration = Duration :: from_secs ( 10 ) ;
12+ use crate :: { Flashblock , Metadata , Metrics } ;
2513
2614/// Trait for receiving flashblock updates.
2715pub trait FlashblocksReceiver {
2816 /// Called when a new flashblock is received.
2917 fn on_flashblock_received ( & self , flashblock : Flashblock ) ;
3018}
3119
32- /// Metadata associated with a flashblock.
33- #[ derive( Debug , Deserialize , Serialize , Clone , Default ) ]
34- pub struct Metadata {
35- /// Transaction receipts indexed by hash.
36- pub receipts : HashMap < B256 , OpReceipt > ,
37- /// Updated account balances.
38- pub new_account_balances : HashMap < Address , U256 > ,
39- /// Block number this flashblock belongs to.
40- pub block_number : u64 ,
41- }
42-
43- /// A flashblock containing partial block data.
44- #[ derive( Debug , Clone ) ]
45- pub struct Flashblock {
46- /// Unique payload identifier.
47- pub payload_id : PayloadId ,
48- /// Index of this flashblock within the block.
49- pub index : u64 ,
50- /// Base payload data (only present on first flashblock).
51- pub base : Option < ExecutionPayloadBaseV1 > ,
52- /// Delta containing transactions and state changes.
53- pub diff : ExecutionPayloadFlashblockDeltaV1 ,
54- /// Associated metadata.
55- pub metadata : Metadata ,
56- }
57-
5820// Simplify actor messages to just handle shutdown
5921#[ derive( Debug ) ]
6022enum ActorMessage {
@@ -73,6 +35,12 @@ impl<Receiver> FlashblocksSubscriber<Receiver>
7335where
7436 Receiver : FlashblocksReceiver + Send + Sync + ' static ,
7537{
38+ /// Interval of liveness check of upstream, in milliseconds.
39+ pub const PING_INTERVAL_MS : u64 = 500 ;
40+
41+ /// Max duration of backoff before reconnecting to upstream.
42+ pub const MAX_BACKOFF : Duration = Duration :: from_secs ( 10 ) ;
43+
7644 /// Creates a new flashblocks subscriber.
7745 pub fn new ( flashblocks_state : Arc < Receiver > , ws_url : Url ) -> Self {
7846 Self { ws_url, flashblocks_state, metrics : Metrics :: default ( ) }
9866 Ok ( ( ws_stream, _) ) => {
9967 info ! ( message = "WebSocket connection established" ) ;
10068
101- let mut ping_interval = interval ( Duration :: from_millis ( PING_INTERVAL_MS ) ) ;
69+ let mut ping_interval =
70+ interval ( Duration :: from_millis ( Self :: PING_INTERVAL_MS ) ) ;
10271 let mut awaiting_pong_resp = false ;
10372
10473 let ( mut write, mut read) = ws_stream. split ( ) ;
@@ -152,11 +121,11 @@ where
152121 warn!(
153122 target: "flashblocks_rpc::subscription" ,
154123 ?backoff,
155- timeout_ms = PING_INTERVAL_MS ,
124+ timeout_ms = Self :: PING_INTERVAL_MS ,
156125 "No pong response from upstream, reconnecting" ,
157126 ) ;
158127
159- backoff = sleep( & metrics, backoff) . await ;
128+ backoff = Self :: sleep( & metrics, backoff) . await ;
160129 break ' conn;
161130 }
162131
@@ -172,7 +141,7 @@ where
172141 "WebSocket connection lost, reconnecting" ,
173142 ) ;
174143
175- backoff = sleep( & metrics, backoff) . await ;
144+ backoff = Self :: sleep( & metrics, backoff) . await ;
176145 break ' conn;
177146 }
178147 awaiting_pong_resp = true
@@ -187,7 +156,7 @@ where
187156 error = %e
188157 ) ;
189158
190- backoff = sleep ( & metrics, backoff) . await ;
159+ backoff = Self :: sleep ( & metrics, backoff) . await ;
191160 continue ;
192161 }
193162 }
@@ -205,13 +174,13 @@ where
205174 }
206175 } ) ;
207176 }
208- }
209177
210- /// Sleeps for given backoff duration. Returns incremented backoff duration, capped at [`MAX_BACKOFF`].
211- async fn sleep ( metrics : & Metrics , backoff : Duration ) -> Duration {
212- metrics. reconnect_attempts . increment ( 1 ) ;
213- tokio:: time:: sleep ( backoff) . await ;
214- std:: cmp:: min ( backoff * 2 , MAX_BACKOFF )
178+ /// Sleeps for given backoff duration. Returns incremented backoff duration, capped at [`MAX_BACKOFF`].
179+ async fn sleep ( metrics : & Metrics , backoff : Duration ) -> Duration {
180+ metrics. reconnect_attempts . increment ( 1 ) ;
181+ tokio:: time:: sleep ( backoff) . await ;
182+ std:: cmp:: min ( backoff * 2 , Self :: MAX_BACKOFF )
183+ }
215184}
216185
217186fn try_decode_message ( bytes : & [ u8 ] ) -> eyre:: Result < Flashblock > {
0 commit comments