@@ -233,6 +233,10 @@ pub struct GenericStateSync<
233233 pending_blocks : Vec < ( BlockNumber , Block , BlockSignature ) > ,
234234 // Queue for compilation tasks - maintains ordering while allowing concurrent compilation
235235 compilation_tasks : FuturesOrdered < CompilationTask > ,
236+ // Buffer for out-of-order completed compilations (fixes marker mismatch bug)
237+ pending_compilations : std:: collections:: HashMap < BlockNumber , CompiledBlockData > ,
238+ // Track the next expected block number for consecutive batching
239+ next_expected_compilation_block : BlockNumber ,
236240}
237241
238242pub type StateSyncResult = Result < ( ) , StateSyncError > ;
@@ -467,50 +471,51 @@ impl<
467471 // tasks
468472 debug ! ( "Selecting between block sync, state diff sync, and compilation tasks." ) ;
469473
470- // Check if we need to collect and flush a batch
471- if self . compilation_tasks . len ( ) >= self . config . block_batch_size {
472- info ! (
473- "Batch size reached ({} blocks). Collecting compiled results..." ,
474- self . compilation_tasks. len( )
475- ) ;
474+ // Try to collect and flush a consecutive batch if we have enough blocks
475+ self . try_flush_consecutive_batch ( ) . await ?;
476476
477- // Collect all completed compilations (in order!)
478- let mut compiled_batch = Vec :: new ( ) ;
479- for _ in 0 ..self . config . block_batch_size {
480- if let Some ( result) = self . compilation_tasks . next ( ) . await {
481- match result {
482- Ok ( compiled_block) => {
483- compiled_batch. push ( compiled_block) ;
477+ // Select from all streams
478+ tokio:: select! {
479+ // Poll compilation tasks to buffer completed compilations
480+ Some ( result) = self . compilation_tasks. next( ) , if !self . compilation_tasks. is_empty( ) => {
481+ match result {
482+ Ok ( compiled_block) => {
483+ let block_number = compiled_block. 0 ;
484+
485+ // If this is the first block, initialize next_expected_compilation_block
486+ if self . next_expected_compilation_block == BlockNumber ( 0 ) && self . pending_compilations. is_empty( ) {
487+ info!(
488+ "First block received: {}. Setting as starting point for consecutive batching." ,
489+ block_number
490+ ) ;
491+ self . next_expected_compilation_block = block_number;
484492 }
485- Err ( e) => {
486- error ! ( "Compilation task failed: {:?}" , e) ;
487- return Err ( e) ;
493+
494+ if block_number == self . next_expected_compilation_block {
495+ // This is the next block we're waiting for
496+ info!(
497+ "Compilation completed for expected block {}. Buffering..." ,
498+ block_number
499+ ) ;
500+ } else {
501+ // Out-of-order block
502+ info!(
503+ "Compilation completed for block {} (out of order, expecting {}). Buffering..." ,
504+ block_number,
505+ self . next_expected_compilation_block
506+ ) ;
488507 }
508+
509+ // Always insert and try to flush (regardless of order)
510+ self . pending_compilations. insert( block_number, compiled_block) ;
511+ self . try_flush_consecutive_batch( ) . await ?;
512+ }
513+ Err ( e) => {
514+ error!( "Compilation task failed: {:?}" , e) ;
515+ return Err ( e) ;
489516 }
490- } else {
491- break ;
492517 }
493518 }
494-
495- if !compiled_batch. is_empty ( ) {
496- let batch_size = compiled_batch. len ( ) ;
497- let first_block = compiled_batch. first ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
498- let last_block = compiled_batch. last ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
499- info ! (
500- "Collected {batch_size} compiled blocks ({first_block:?} to \
501- {last_block:?}). Writing to storage in ONE transaction..."
502- ) ;
503-
504- // Write all compiled blocks in ONE transaction
505- self . write_compiled_batch ( compiled_batch) . await ?;
506-
507- info ! ( "Successfully wrote {batch_size} blocks to storage in one transaction." ) ;
508- }
509- }
510-
511- // Select from all streams
512- tokio:: select! {
513- // No longer check compilation_tasks here - we collect them in batches above
514519 // Check for sync events from streams
515520 res = block_stream. next( ) => {
516521 let sync_event = res. expect( "Received None from block stream." ) ?;
@@ -889,6 +894,105 @@ impl<
889894 ) )
890895 }
891896
897+ // Try to flush a consecutive batch if we have enough blocks
898+ async fn try_flush_consecutive_batch ( & mut self ) -> StateSyncResult {
899+ let batch_size = self . config . block_batch_size ;
900+
901+ // Safety: If buffer is too large (5x batch size), flush whatever consecutive blocks we have
902+ // This prevents unbounded memory growth when blocks arrive very out of order
903+ let buffer_size = self . pending_compilations . len ( ) ;
904+ let max_buffer_size = batch_size * 5 ; // Allow up to 5 batches worth of buffering
905+
906+ if buffer_size > max_buffer_size {
907+ warn ! (
908+ "Buffer size ({}) exceeds safety limit ({}). Flushing available consecutive blocks to prevent OOM." ,
909+ buffer_size,
910+ max_buffer_size
911+ ) ;
912+
913+ // Flush whatever consecutive blocks we have (even if less than batch_size)
914+ let mut consecutive_blocks = Vec :: new ( ) ;
915+ let mut current_block = self . next_expected_compilation_block ;
916+
917+ // Collect as many consecutive blocks as possible (up to batch_size)
918+ while consecutive_blocks. len ( ) < batch_size {
919+ if let Some ( compiled_block) = self . pending_compilations . remove ( & current_block) {
920+ consecutive_blocks. push ( compiled_block) ;
921+ current_block = current_block. unchecked_next ( ) ;
922+ } else {
923+ break ; // Gap found, flush what we have
924+ }
925+ }
926+
927+ if !consecutive_blocks. is_empty ( ) {
928+ let first_block = consecutive_blocks. first ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
929+ let last_block = consecutive_blocks. last ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
930+ info ! (
931+ "Flushing partial consecutive batch of {} blocks ({:?} to {:?}) due to buffer size limit..." ,
932+ consecutive_blocks. len( ) ,
933+ first_block,
934+ last_block
935+ ) ;
936+
937+ self . write_compiled_batch ( consecutive_blocks) . await ?;
938+ self . next_expected_compilation_block = current_block;
939+
940+ info ! (
941+ "Successfully flushed batch. Next expected block: {}" ,
942+ self . next_expected_compilation_block
943+ ) ;
944+ }
945+
946+ return Ok ( ( ) ) ;
947+ }
948+
949+ // Normal case: CHECK if we have a full consecutive batch (don't remove yet)
950+ let mut current_block = self . next_expected_compilation_block ;
951+ let mut count = 0 ;
952+ while count < batch_size {
953+ if self . pending_compilations . contains_key ( & current_block) {
954+ count += 1 ;
955+ current_block = current_block. unchecked_next ( ) ;
956+ } else {
957+ // Gap found - not enough consecutive blocks yet
958+ return Ok ( ( ) ) ;
959+ }
960+ }
961+
962+ // We have a full batch! Now remove and collect them
963+ let mut consecutive_blocks = Vec :: new ( ) ;
964+ current_block = self . next_expected_compilation_block ;
965+ for _ in 0 ..batch_size {
966+ if let Some ( compiled_block) = self . pending_compilations . remove ( & current_block) {
967+ consecutive_blocks. push ( compiled_block) ;
968+ current_block = current_block. unchecked_next ( ) ;
969+ }
970+ }
971+
972+ // Flush the batch
973+ let first_block = consecutive_blocks. first ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
974+ let last_block = consecutive_blocks. last ( ) . map ( |( bn, _, _, _, _, _) | * bn) ;
975+ info ! (
976+ "Flushing consecutive batch of {} blocks ({:?} to {:?})..." ,
977+ consecutive_blocks. len( ) ,
978+ first_block,
979+ last_block
980+ ) ;
981+
982+ // Write the batch
983+ self . write_compiled_batch ( consecutive_blocks) . await ?;
984+
985+ // Update next expected block
986+ self . next_expected_compilation_block = current_block;
987+
988+ info ! (
989+ "Successfully flushed batch. Next expected block: {}" ,
990+ self . next_expected_compilation_block
991+ ) ;
992+
993+ Ok ( ( ) )
994+ }
995+
892996 // Write a compiled batch to storage (called after compilation completes)
893997 async fn write_compiled_batch ( & mut self , compiled_batch : CompiledBatchData ) -> StateSyncResult {
894998 let batch_start = Instant :: now ( ) ;
@@ -1465,6 +1569,14 @@ impl StateSync {
14651569 class_manager_client : Option < SharedClassManagerClient > ,
14661570 ) -> Self {
14671571 let base_layer_source = base_layer_source. map ( Arc :: new) ;
1572+
1573+ // Initialize next_expected_compilation_block to current state marker
1574+ let next_expected_compilation_block = reader
1575+ . begin_ro_txn ( )
1576+ . ok ( )
1577+ . and_then ( |txn| txn. get_state_marker ( ) . ok ( ) )
1578+ . unwrap_or ( BlockNumber ( 0 ) ) ;
1579+
14681580 Self {
14691581 config,
14701582 shared_highest_block,
@@ -1479,6 +1591,8 @@ impl StateSync {
14791591 class_manager_client,
14801592 pending_blocks : Vec :: new ( ) ,
14811593 compilation_tasks : FuturesOrdered :: new ( ) ,
1594+ pending_compilations : std:: collections:: HashMap :: new ( ) ,
1595+ next_expected_compilation_block,
14821596 }
14831597 }
14841598}
0 commit comments