Skip to content

Commit 6ed0609

Browse files
OOO fix
1 parent 32bbf67 commit 6ed0609

File tree

1 file changed

+28
-92
lines changed
  • crates/apollo_central_sync/src

1 file changed

+28
-92
lines changed

crates/apollo_central_sync/src/lib.rs

Lines changed: 28 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -898,111 +898,47 @@ impl<
898898
async fn try_flush_consecutive_batch(&mut self) -> StateSyncResult {
899899
let batch_size = self.config.block_batch_size;
900900

901-
// Safety: If buffer is too large, flush consecutive blocks to prevent OOM
902-
// But only if we have enough consecutive blocks to make it worthwhile (maintains batching benefits)
901+
// Safety monitoring: Log warning if buffer is very large
902+
// Since blocks arrive in order from FuturesOrdered, we should always have consecutive blocks
903+
// to flush in normal batches. Only intervene in extreme cases.
903904
let buffer_size = self.pending_compilations.len();
904-
let max_buffer_size = batch_size * 20; // Allow up to 20 batches worth of buffering (2,000 blocks for batch_size=100)
905-
let min_partial_batch_size = batch_size / 10; // Only flush partial batches if we have at least 10 consecutive blocks (for batch_size=100)
906905

907-
if buffer_size > max_buffer_size {
908-
warn!(
909-
"Buffer size ({}) exceeds safety limit ({}). Checking for flushable consecutive blocks...",
910-
buffer_size,
911-
max_buffer_size
906+
// Extreme emergency: Only if buffer exceeds 50,000 blocks (should never happen with ordered arrival)
907+
if buffer_size > 50000 {
908+
error!(
909+
"CRITICAL: Buffer size ({}) is extremely large! This indicates a serious problem with block ordering or flush logic.",
910+
buffer_size
912911
);
913912

914-
// Count how many consecutive blocks we have starting from next_expected
913+
// Count consecutive blocks
915914
let mut consecutive_count = 0;
916915
let mut check_block = self.next_expected_compilation_block;
917-
while consecutive_count < batch_size {
918-
if self.pending_compilations.contains_key(&check_block) {
919-
consecutive_count += 1;
920-
check_block = check_block.unchecked_next();
921-
} else {
922-
break;
916+
while self.pending_compilations.contains_key(&check_block) {
917+
consecutive_count += 1;
918+
check_block = check_block.unchecked_next();
919+
if consecutive_count >= batch_size {
920+
break; // We have a full batch, normal flush will handle it
923921
}
924922
}
925923

926-
// Only flush if we have a meaningful batch (at least min_partial_batch_size)
927-
if consecutive_count >= min_partial_batch_size {
924+
// If we have a full batch, let normal flush handle it
925+
if consecutive_count >= batch_size {
928926
warn!(
929-
"Flushing {} consecutive blocks (minimum {} required) to prevent OOM while maintaining batching benefits.",
930-
consecutive_count,
931-
min_partial_batch_size
932-
);
933-
934-
// Collect and flush the consecutive blocks
935-
let mut consecutive_blocks = Vec::new();
936-
let mut current_block = self.next_expected_compilation_block;
937-
for _ in 0..consecutive_count {
938-
if let Some(compiled_block) = self.pending_compilations.remove(&current_block) {
939-
consecutive_blocks.push(compiled_block);
940-
current_block = current_block.unchecked_next();
941-
}
942-
}
943-
944-
let first_block = consecutive_blocks.first().map(|(bn, _, _, _, _, _)| *bn);
945-
let last_block = consecutive_blocks.last().map(|(bn, _, _, _, _, _)| *bn);
946-
info!(
947-
"Flushing partial consecutive batch of {} blocks ({:?} to {:?}) due to buffer size limit...",
948-
consecutive_blocks.len(),
949-
first_block,
950-
last_block
951-
);
952-
953-
self.write_compiled_batch(consecutive_blocks).await?;
954-
self.next_expected_compilation_block = current_block;
955-
956-
info!(
957-
"Successfully flushed batch. Next expected block: {}",
958-
self.next_expected_compilation_block
927+
"Buffer size ({}) is large, but we have {} consecutive blocks for normal batch flush.",
928+
buffer_size,
929+
consecutive_count
959930
);
931+
// Fall through to normal batch flush logic below
960932
} else {
961-
// Emergency fallback: If buffer is extremely large (1.5x threshold), flush whatever we have
962-
let emergency_threshold = max_buffer_size + (max_buffer_size / 2); // 3,000 blocks (for batch_size=100)
963-
if buffer_size > emergency_threshold {
964-
error!(
965-
"EMERGENCY: Buffer size ({}) exceeds emergency threshold ({}). Flushing {} consecutive blocks to prevent OOM (batching degraded).",
966-
buffer_size,
967-
emergency_threshold,
968-
consecutive_count
969-
);
970-
971-
// Flush whatever consecutive blocks we have (even if < min_partial_batch_size)
972-
let mut consecutive_blocks = Vec::new();
973-
let mut current_block = self.next_expected_compilation_block;
974-
for _ in 0..consecutive_count {
975-
if let Some(compiled_block) = self.pending_compilations.remove(&current_block) {
976-
consecutive_blocks.push(compiled_block);
977-
current_block = current_block.unchecked_next();
978-
}
979-
}
980-
981-
if !consecutive_blocks.is_empty() {
982-
let first_block = consecutive_blocks.first().map(|(bn, _, _, _, _, _)| *bn);
983-
let last_block = consecutive_blocks.last().map(|(bn, _, _, _, _, _)| *bn);
984-
info!(
985-
"EMERGENCY flush of {} blocks ({:?} to {:?})",
986-
consecutive_blocks.len(),
987-
first_block,
988-
last_block
989-
);
990-
991-
self.write_compiled_batch(consecutive_blocks).await?;
992-
self.next_expected_compilation_block = current_block;
993-
}
994-
} else {
995-
warn!(
996-
"Buffer size ({}) exceeds limit ({}), but only {} consecutive blocks available (minimum {} required). Waiting for more consecutive blocks to maintain batching benefits.",
997-
buffer_size,
998-
max_buffer_size,
999-
consecutive_count,
1000-
min_partial_batch_size
1001-
);
1002-
}
933+
// This should never happen with FuturesOrdered - blocks arrive in order!
934+
error!(
935+
"CRITICAL: Buffer has {} blocks but only {} consecutive! This indicates FuturesOrdered is not working as expected.",
936+
buffer_size,
937+
consecutive_count
938+
);
939+
// Don't flush - something is fundamentally wrong
940+
return Ok(());
1003941
}
1004-
1005-
return Ok(());
1006942
}
1007943

1008944
// Normal case: CHECK if we have a full consecutive batch (don't remove yet)

0 commit comments

Comments
 (0)