Skip to content

Commit d159a05

Browse files
marker mismatch fixed
1 parent 06b6b98 commit d159a05

File tree

2 files changed

+289
-38
lines changed

2 files changed

+289
-38
lines changed

crates/apollo_central_sync/src/lib.rs

Lines changed: 211 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,10 @@ pub struct GenericStateSync<
233233
pending_blocks: Vec<(BlockNumber, Block, BlockSignature)>,
234234
// Queue for compilation tasks - maintains ordering while allowing concurrent compilation
235235
compilation_tasks: FuturesOrdered<CompilationTask>,
236+
// Buffer for out-of-order completed compilations (fixes marker mismatch bug)
237+
pending_compilations: std::collections::HashMap<BlockNumber, CompiledBlockData>,
238+
// Track the next expected block number for consecutive batching
239+
next_expected_compilation_block: BlockNumber,
236240
}
237241

238242
pub type StateSyncResult = Result<(), StateSyncError>;
@@ -467,50 +471,51 @@ impl<
467471
// tasks
468472
debug!("Selecting between block sync, state diff sync, and compilation tasks.");
469473

470-
// Check if we need to collect and flush a batch
471-
if self.compilation_tasks.len() >= self.config.block_batch_size {
472-
info!(
473-
"Batch size reached ({} blocks). Collecting compiled results...",
474-
self.compilation_tasks.len()
475-
);
474+
// Try to collect and flush a consecutive batch if we have enough blocks
475+
self.try_flush_consecutive_batch().await?;
476476

477-
// Collect all completed compilations (in order!)
478-
let mut compiled_batch = Vec::new();
479-
for _ in 0..self.config.block_batch_size {
480-
if let Some(result) = self.compilation_tasks.next().await {
481-
match result {
482-
Ok(compiled_block) => {
483-
compiled_batch.push(compiled_block);
477+
// Select from all streams
478+
tokio::select! {
479+
// Poll compilation tasks to buffer completed compilations
480+
Some(result) = self.compilation_tasks.next(), if !self.compilation_tasks.is_empty() => {
481+
match result {
482+
Ok(compiled_block) => {
483+
let block_number = compiled_block.0;
484+
485+
// If this is the first block, initialize next_expected_compilation_block
486+
if self.next_expected_compilation_block == BlockNumber(0) && self.pending_compilations.is_empty() {
487+
info!(
488+
"First block received: {}. Setting as starting point for consecutive batching.",
489+
block_number
490+
);
491+
self.next_expected_compilation_block = block_number;
484492
}
485-
Err(e) => {
486-
error!("Compilation task failed: {:?}", e);
487-
return Err(e);
493+
494+
if block_number == self.next_expected_compilation_block {
495+
// This is the next block we're waiting for
496+
info!(
497+
"Compilation completed for expected block {}. Buffering...",
498+
block_number
499+
);
500+
} else {
501+
// Out-of-order block
502+
info!(
503+
"Compilation completed for block {} (out of order, expecting {}). Buffering...",
504+
block_number,
505+
self.next_expected_compilation_block
506+
);
488507
}
508+
509+
// Always insert and try to flush (regardless of order)
510+
self.pending_compilations.insert(block_number, compiled_block);
511+
self.try_flush_consecutive_batch().await?;
512+
}
513+
Err(e) => {
514+
error!("Compilation task failed: {:?}", e);
515+
return Err(e);
489516
}
490-
} else {
491-
break;
492517
}
493518
}
494-
495-
if !compiled_batch.is_empty() {
496-
let batch_size = compiled_batch.len();
497-
let first_block = compiled_batch.first().map(|(bn, _, _, _, _, _)| *bn);
498-
let last_block = compiled_batch.last().map(|(bn, _, _, _, _, _)| *bn);
499-
info!(
500-
"Collected {batch_size} compiled blocks ({first_block:?} to \
501-
{last_block:?}). Writing to storage in ONE transaction..."
502-
);
503-
504-
// Write all compiled blocks in ONE transaction
505-
self.write_compiled_batch(compiled_batch).await?;
506-
507-
info!("Successfully wrote {batch_size} blocks to storage in one transaction.");
508-
}
509-
}
510-
511-
// Select from all streams
512-
tokio::select! {
513-
// No longer check compilation_tasks here - we collect them in batches above
514519
// Check for sync events from streams
515520
res = block_stream.next() => {
516521
let sync_event = res.expect("Received None from block stream.")?;
@@ -889,6 +894,164 @@ impl<
889894
))
890895
}
891896

897+
// Try to flush a consecutive batch if we have enough blocks
898+
async fn try_flush_consecutive_batch(&mut self) -> StateSyncResult {
899+
let batch_size = self.config.block_batch_size;
900+
901+
// Safety: If buffer is too large, flush consecutive blocks to prevent OOM
902+
// But only if we have enough consecutive blocks to make it worthwhile (maintains batching benefits)
903+
let buffer_size = self.pending_compilations.len();
904+
let max_buffer_size = batch_size * 20; // Allow up to 20 batches worth of buffering (20,000 blocks)
905+
let min_partial_batch_size = batch_size / 10; // Only flush partial batches if we have at least 100 consecutive blocks
906+
907+
if buffer_size > max_buffer_size {
908+
warn!(
909+
"Buffer size ({}) exceeds safety limit ({}). Checking for flushable consecutive blocks...",
910+
buffer_size,
911+
max_buffer_size
912+
);
913+
914+
// Count how many consecutive blocks we have starting from next_expected
915+
let mut consecutive_count = 0;
916+
let mut check_block = self.next_expected_compilation_block;
917+
while consecutive_count < batch_size {
918+
if self.pending_compilations.contains_key(&check_block) {
919+
consecutive_count += 1;
920+
check_block = check_block.unchecked_next();
921+
} else {
922+
break;
923+
}
924+
}
925+
926+
// Only flush if we have a meaningful batch (at least min_partial_batch_size)
927+
if consecutive_count >= min_partial_batch_size {
928+
warn!(
929+
"Flushing {} consecutive blocks (minimum {} required) to prevent OOM while maintaining batching benefits.",
930+
consecutive_count,
931+
min_partial_batch_size
932+
);
933+
934+
// Collect and flush the consecutive blocks
935+
let mut consecutive_blocks = Vec::new();
936+
let mut current_block = self.next_expected_compilation_block;
937+
for _ in 0..consecutive_count {
938+
if let Some(compiled_block) = self.pending_compilations.remove(&current_block) {
939+
consecutive_blocks.push(compiled_block);
940+
current_block = current_block.unchecked_next();
941+
}
942+
}
943+
944+
let first_block = consecutive_blocks.first().map(|(bn, _, _, _, _, _)| *bn);
945+
let last_block = consecutive_blocks.last().map(|(bn, _, _, _, _, _)| *bn);
946+
info!(
947+
"Flushing partial consecutive batch of {} blocks ({:?} to {:?}) due to buffer size limit...",
948+
consecutive_blocks.len(),
949+
first_block,
950+
last_block
951+
);
952+
953+
self.write_compiled_batch(consecutive_blocks).await?;
954+
self.next_expected_compilation_block = current_block;
955+
956+
info!(
957+
"Successfully flushed batch. Next expected block: {}",
958+
self.next_expected_compilation_block
959+
);
960+
} else {
961+
// Emergency fallback: If buffer is extremely large (1.5x threshold), flush whatever we have
962+
let emergency_threshold = max_buffer_size + (max_buffer_size / 2); // 30,000 blocks
963+
if buffer_size > emergency_threshold {
964+
error!(
965+
"EMERGENCY: Buffer size ({}) exceeds emergency threshold ({}). Flushing {} consecutive blocks to prevent OOM (batching degraded).",
966+
buffer_size,
967+
emergency_threshold,
968+
consecutive_count
969+
);
970+
971+
// Flush whatever consecutive blocks we have (even if < min_partial_batch_size)
972+
let mut consecutive_blocks = Vec::new();
973+
let mut current_block = self.next_expected_compilation_block;
974+
for _ in 0..consecutive_count {
975+
if let Some(compiled_block) = self.pending_compilations.remove(&current_block) {
976+
consecutive_blocks.push(compiled_block);
977+
current_block = current_block.unchecked_next();
978+
}
979+
}
980+
981+
if !consecutive_blocks.is_empty() {
982+
let first_block = consecutive_blocks.first().map(|(bn, _, _, _, _, _)| *bn);
983+
let last_block = consecutive_blocks.last().map(|(bn, _, _, _, _, _)| *bn);
984+
info!(
985+
"EMERGENCY flush of {} blocks ({:?} to {:?})",
986+
consecutive_blocks.len(),
987+
first_block,
988+
last_block
989+
);
990+
991+
self.write_compiled_batch(consecutive_blocks).await?;
992+
self.next_expected_compilation_block = current_block;
993+
}
994+
} else {
995+
warn!(
996+
"Buffer size ({}) exceeds limit ({}), but only {} consecutive blocks available (minimum {} required). Waiting for more consecutive blocks to maintain batching benefits.",
997+
buffer_size,
998+
max_buffer_size,
999+
consecutive_count,
1000+
min_partial_batch_size
1001+
);
1002+
}
1003+
}
1004+
1005+
return Ok(());
1006+
}
1007+
1008+
// Normal case: CHECK if we have a full consecutive batch (don't remove yet)
1009+
let mut current_block = self.next_expected_compilation_block;
1010+
let mut count = 0;
1011+
while count < batch_size {
1012+
if self.pending_compilations.contains_key(&current_block) {
1013+
count += 1;
1014+
current_block = current_block.unchecked_next();
1015+
} else {
1016+
// Gap found - not enough consecutive blocks yet
1017+
return Ok(());
1018+
}
1019+
}
1020+
1021+
// We have a full batch! Now remove and collect them
1022+
let mut consecutive_blocks = Vec::new();
1023+
current_block = self.next_expected_compilation_block;
1024+
for _ in 0..batch_size {
1025+
if let Some(compiled_block) = self.pending_compilations.remove(&current_block) {
1026+
consecutive_blocks.push(compiled_block);
1027+
current_block = current_block.unchecked_next();
1028+
}
1029+
}
1030+
1031+
// Flush the batch
1032+
let first_block = consecutive_blocks.first().map(|(bn, _, _, _, _, _)| *bn);
1033+
let last_block = consecutive_blocks.last().map(|(bn, _, _, _, _, _)| *bn);
1034+
info!(
1035+
"Flushing consecutive batch of {} blocks ({:?} to {:?})...",
1036+
consecutive_blocks.len(),
1037+
first_block,
1038+
last_block
1039+
);
1040+
1041+
// Write the batch
1042+
self.write_compiled_batch(consecutive_blocks).await?;
1043+
1044+
// Update next expected block
1045+
self.next_expected_compilation_block = current_block;
1046+
1047+
info!(
1048+
"Successfully flushed batch. Next expected block: {}",
1049+
self.next_expected_compilation_block
1050+
);
1051+
1052+
Ok(())
1053+
}
1054+
8921055
// Write a compiled batch to storage (called after compilation completes)
8931056
async fn write_compiled_batch(&mut self, compiled_batch: CompiledBatchData) -> StateSyncResult {
8941057
let batch_start = Instant::now();
@@ -1465,6 +1628,14 @@ impl StateSync {
14651628
class_manager_client: Option<SharedClassManagerClient>,
14661629
) -> Self {
14671630
let base_layer_source = base_layer_source.map(Arc::new);
1631+
1632+
// Initialize next_expected_compilation_block to current state marker
1633+
let next_expected_compilation_block = reader
1634+
.begin_ro_txn()
1635+
.ok()
1636+
.and_then(|txn| txn.get_state_marker().ok())
1637+
.unwrap_or(BlockNumber(0));
1638+
14681639
Self {
14691640
config,
14701641
shared_highest_block,
@@ -1479,6 +1650,8 @@ impl StateSync {
14791650
class_manager_client,
14801651
pending_blocks: Vec::new(),
14811652
compilation_tasks: FuturesOrdered::new(),
1653+
pending_compilations: std::collections::HashMap::new(),
1654+
next_expected_compilation_block,
14821655
}
14831656
}
14841657
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
#!/bin/bash
2+
3+
set -e
4+
5+
echo "=========================================="
6+
echo "Setting up 200K Fixed Test (Separate from 500K test)"
7+
echo "=========================================="
8+
echo ""
9+
10+
# Colors
11+
GREEN='\033[0;32m'
12+
YELLOW='\033[1;33m'
13+
RED='\033[0;31m'
14+
NC='\033[0m' # No Color
15+
16+
echo -e "${YELLOW}Step 1: Create namespace${NC}"
17+
kubectl create namespace dean-batching-200k-fixed --dry-run=client -o yaml | kubectl apply -f -
18+
echo -e "${GREEN}✓ Namespace created${NC}"
19+
echo ""
20+
21+
echo -e "${YELLOW}Step 2: Copy ConfigMaps from dean-batching namespace${NC}"
22+
# Copy sequencer-configs
23+
kubectl get configmap sequencer-configs -n dean-batching -o yaml | \
24+
sed 's/namespace: dean-batching/namespace: dean-batching-200k-fixed/' | \
25+
sed '/resourceVersion:/d' | \
26+
sed '/uid:/d' | \
27+
sed '/creationTimestamp:/d' | \
28+
kubectl apply -f -
29+
30+
# Copy test scripts
31+
kubectl get configmap dean-test-scripts -n dean-batching -o yaml | \
32+
sed 's/namespace: dean-batching/namespace: dean-batching-200k-fixed/' | \
33+
sed '/resourceVersion:/d' | \
34+
sed '/uid:/d' | \
35+
sed '/creationTimestamp:/d' | \
36+
kubectl apply -f -
37+
38+
echo -e "${GREEN}✓ ConfigMaps copied${NC}"
39+
echo ""
40+
41+
echo -e "${YELLOW}Step 3: Ensure StorageClass exists${NC}"
42+
kubectl apply -f storage-class-balanced-1tb.yaml
43+
echo -e "${GREEN}✓ StorageClass ready${NC}"
44+
echo ""
45+
46+
echo -e "${YELLOW}Step 4: Create PVC (500GB)${NC}"
47+
kubectl apply -f pvc-200k-fixed.yaml
48+
echo -e "${GREEN}✓ PVC created${NC}"
49+
echo ""
50+
51+
echo -e "${YELLOW}Step 5: Wait for PVC to be bound...${NC}"
52+
kubectl wait --for=condition=bound pvc/dean-200k-fixed-pvc -n dean-batching-200k-fixed --timeout=120s
53+
echo -e "${GREEN}✓ PVC bound${NC}"
54+
echo ""
55+
56+
echo -e "${YELLOW}Step 6: Create and start the test job${NC}"
57+
kubectl apply -f batching-test-200k-fixed.yaml
58+
echo -e "${GREEN}✓ Job created${NC}"
59+
echo ""
60+
61+
echo "=========================================="
62+
echo -e "${GREEN}Test Setup Complete!${NC}"
63+
echo "=========================================="
64+
echo ""
65+
echo "Your NEW 200K test is now starting in a separate pod."
66+
echo "Your OLD 500K test continues running undisturbed."
67+
echo ""
68+
echo "Monitor the NEW test with:"
69+
echo " kubectl logs -f -n dean-batching-200k-fixed -l app=batching-test-200k-fixed"
70+
echo ""
71+
echo "Check pod status:"
72+
echo " kubectl get pods -n dean-batching-200k-fixed"
73+
echo ""
74+
echo "Check both tests:"
75+
echo " kubectl get pods -n dean-batching # Old 500K test"
76+
echo " kubectl get pods -n dean-batching-200k-fixed # New 200K test"
77+
echo ""
78+

0 commit comments

Comments
 (0)