-
Notifications
You must be signed in to change notification settings - Fork 0
Batch Processing Guide
This guide explains the CodeChunking system's batch processing architecture and embedding generation workflow.
The CodeChunking system uses a multi-layer batch processing architecture that enables efficient embedding generation at scale through:
- Three-layer processing architecture - Batch submission, progress tracking, and queue management
- FIFO queue management - Simple first-in-first-out processing for consistent ordering
- Rate-limit aware submission - Exponential backoff with global rate limit detection
- Token counting - Pre-flight token counting with progressive chunk saving
- Automatic fallback - Sequential processing fallback when batch operations fail
┌─────────────────┐ ┌──────────────────┐ ┌─────────────────────┐
│ Job Processor │───▶│ Token Counter │───▶│ Processing Mode │
│ │ │ │ │ Decision │
│ - Receives job │ │ - Count tokens │ │ │
│ - Load chunks │ │ - Progressive │ │ chunks >= threshold │
│ │ │ save (50/batch)│ │ → Batch Mode │
└─────────────────┘ └──────────────────┘ │ chunks < threshold │
│ → Sequential Mode │
└─────────────────────┘
│
┌────────────────────────────────┼────────────────────────────────┐
│ │ │
▼ ▼ │
┌──────────────────┐ ┌──────────────────┐ │
│ Batch Mode │ │ Sequential Mode │ │
│ │ │ │ │
│ - Queue requests │ │ - One-by-one │ │
│ - Pre-save chunks│ │ - Immediate save │ │
│ - Submit batch │ │ - Direct API │ │
└────────┬─────────┘ └──────────────────┘ │
│ │
▼ │
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ │
│ Batch Submitter │───▶│ Gemini API │───▶│ Batch Poller │ │
│ │ │ │ │ │ │
│ - Rate limiting │ │ - Files API │ │ - Poll status │ │
│ - Exp. backoff │ │ - Batches API │ │ - Fetch results │ │
│ - Retry logic │ │ │ │ - Update chunks │ │
└──────────────────┘ └──────────────────┘ └──────────────────┘ │
│ │ │
│ ┌──────────────────┐ │ │
└─────────────▶│ Batch Progress │◀─────────────┘ │
│ Tracking │ │
│ │ │
│ - Job status │ │
│ - Retry state │ │
│ - File URI cache │ │
└──────────────────┘ │
│ │
│ (on failure + fallback enabled) │
└────────────────────────────────────────────┘
Location: internal/adapter/outbound/gemini/batch_embedding_client.go
The batch submission layer interfaces directly with Google's Gemini Batches API:
- File Upload: Chunks are converted to JSONL format and uploaded via Google Files API
- Job Creation: Batch job submitted to Gemini with reference to uploaded file
-
Request ID Encoding: UUIDs encoded as
chunk_<uuid_without_hyphens>for roundtrip mapping
Key Constraints:
- Maximum 500 chunks per API batch (hard limit in Gemini API)
- File URI persisted for idempotent retry on transient failures
Location: internal/domain/entity/batch_job_progress.go, internal/application/worker/batch_poller.go
Tracks individual batch jobs through their complete lifecycle:
┌───────────────────┐
│ pending_submission│ ◀─────────────────────────────┐
└─────────┬─────────┘ │
│ Submitter attempts │
▼ │
┌───────────────────┐ │
│ processing │ │
└─────────┬─────────┘ │
│ Poller checks status │
├──────────────────┐ │
▼ ▼ │
┌───────────────┐ ┌────────────────┐ │
│ completed │ │ retry_scheduled │─────────────┘
└───────────────┘ └────────────────┘ (backoff expires)
│
▼ (max retries exceeded)
┌───────────────┐
│ failed │
└───────────────┘
Batch Job States:
| State | Description |
|---|---|
pending_submission |
Ready for first submission attempt |
processing |
Submitted to Gemini, awaiting results |
completed |
Successfully processed |
retry_scheduled |
Scheduled for retry with backoff |
failed |
Permanent failure (max retries exceeded) |
Location: internal/adapter/outbound/queue/batch_queue_manager.go
Manages queuing before batch submission using a simple FIFO (first-in-first-out) queue. Requests are processed in the order they are received, ensuring predictable and fair ordering.
The queue manager tracks queue statistics:
// From batch_queue_manager.go QueueStats
type QueueStats struct {
QueueSize int // Current queue depth
TotalProcessed int // Total requests processed
TotalBatches int // Total batches created
TotalErrors int // Total processing errors
// ... other metrics
}When submitting embedding requests, create an EmbeddingRequest:
request := &outbound.EmbeddingRequest{
RequestID: uuid.New().String(),
Text: chunkContent,
Deadline: timePtr(time.Now().Add(5 * time.Second)), // Optional deadline
Metadata: map[string]interface{}{
"repository_id": repoID,
"chunk_id": chunkID,
},
}The system chooses between batch and sequential processing based on chunk count:
if len(chunks) >= batchConfig.ThresholdChunks && batchProcessingEnabled {
// Use batch processing
processBatchEmbeddings(chunks)
} else {
// Use sequential processing
processSequentialEmbeddings(chunks)
}When used:
- Chunk count below threshold
- Batch processing disabled
- Fallback from failed batch operations
Characteristics:
- Process chunks one-by-one
- Each embedding saved immediately to database
- Higher API cost per chunk
- Simple error handling
- Lower memory usage
When used:
- Chunk count meets or exceeds threshold
- Batch processing enabled
- Queue manager available
Characteristics:
- Chunks pre-saved to database before API submission
- Single API call for multiple chunks
- Rate-limit aware submission with exponential backoff
- Asynchronous polling for job completion
- Lower API cost per chunk
Configuration: use_test_embeddings: true
Behavior:
- Generates synthetic 768-dimensional vectors locally
- Hash-based deterministic vectors (same content = same vector)
- No API calls - completely offline
- Fast iteration for development and testing
Token counting is a pre-flight operation that runs before embedding generation:
| Mode | Behavior | Use Case |
|---|---|---|
all |
Count tokens for all chunks | Production (accurate billing) |
sample |
Count X% of chunks | Large repositories (estimation) |
on_demand |
Skip counting | Performance-critical paths |
Token counts are saved progressively in batches of 50 chunks to:
- Reduce memory pressure
- Enable progress tracking
- Allow recovery on interruption
token_counting:
enabled: true
mode: "all" # "all", "sample", or "on_demand"
sample_percent: 10 # For "sample" mode
max_tokens_per_chunk: 8192 # Gemini embedding model limitThe batch submitter implements sophisticated rate limit handling:
- Detection: Identifies rate limit errors from Gemini API responses
- Global Backoff: Applies backoff to all pending submissions
- Exponential Backoff: Increases delay on consecutive failures
- Idempotent Retry: Reuses uploaded file URI to prevent duplicate uploads
batch_processing:
submission_initial_backoff: 1m # Initial backoff on rate limit
submission_max_backoff: 30m # Maximum backoff duration
max_submission_attempts: 10 # Max retries before permanent failureAttempt 1: Fail → Wait 1m
Attempt 2: Fail → Wait 2m
Attempt 3: Fail → Wait 4m
Attempt 4: Fail → Wait 8m
...
Attempt N: Fail → Wait min(2^N minutes, 30m)
batch_processing:
enabled: true # Enable batch processing
threshold_chunks: 50 # Min chunks to trigger batch mode
max_batch_size: 500 # Max chunks per Gemini API batch
fallback_to_sequential: true # Fallback on batch failures
use_test_embeddings: false # Use synthetic vectors (dev only)
# Retry configuration
initial_backoff: 30s
max_backoff: 300s
max_retries: 3
# Submitter configuration
submitter_poll_interval: 5s
max_concurrent_submissions: 1
submission_initial_backoff: 1m
submission_max_backoff: 30m
max_submission_attempts: 10
# Poller configuration
poller_interval: 30s
max_concurrent_polls: 5
# Queue limits
queue_limits:
max_queue_size: 10000
max_wait_time: 30m
# Token counting
token_counting:
enabled: true
mode: "all"
sample_percent: 10
max_tokens_per_chunk: 8192batch_processing:
threshold_chunks: 10 # Lower threshold for testing
max_batch_size: 300 # Smaller batches
fallback_to_sequential: false # Test batch mode without fallback
initial_backoff: 5s # Shorter backoffs
max_backoff: 60s
max_retries: 2batch_processing:
threshold_chunks: 100 # Higher threshold for efficiency
max_batch_size: 1000 # Larger batches
fallback_to_sequential: true # Ensure reliability
initial_backoff: 1m # Longer backoffs
max_backoff: 10m
max_retries: 5
poller_interval: 60s # Less frequent polling
max_concurrent_polls: 10 # More parallel polls
queue_limits:
max_queue_size: 50000 # Larger queue
max_wait_time: 60mThe system emits OpenTelemetry metrics for monitoring:
-
batch_chunks_processed- Total chunks processed -
batch_jobs_submitted- Batch jobs submitted to Gemini -
batch_jobs_completed- Successfully completed jobs
-
batch_submission_latency- Time from queue to submission -
batch_processing_latency- Gemini processing time -
batch_end_to_end_latency- Total processing time
-
batch_submission_failures- Failed submissions -
batch_rate_limit_hits- Rate limit encounters -
batch_fallback_count- Sequential fallback invocations
Test Mode Active:
"Processing batch embedding results (TEST MODE)"
"Using test embeddings for development"
Production Mode Active:
"Processing batch embedding results (PRODUCTION MODE)"
"chunk_count": 8269
Fallback Triggered:
"Batch processing failed, falling back to sequential"
Symptoms: Always using sequential processing
Check:
-
batch_processing.enabledistrue - Chunk count meets
threshold_chunks - Queue manager initialized (check logs for "batch queue manager initialized")
Symptoms: Submissions failing with rate limit errors
Solutions:
- Reduce
max_concurrent_submissionsto 1 - Increase
submission_initial_backoff - Check Gemini API quota limits
Symptoms: Slow embedding generation
Solutions:
- Increase
max_concurrent_pollsfor faster status checking - Reduce
poller_intervalfor more frequent checks - Consider reducing
max_batch_sizefor faster individual batches
Symptoms: Requests waiting too long in queue
Solutions:
- Increase
max_concurrent_submissions - Check for rate limit backoff (review logs)
- Increase worker concurrency
# Check current queue status
codechunking worker --debug --log-level debug
# Monitor batch metrics
curl -s http://localhost:8080/metrics | grep batch_
# View worker logs with batch info
make logs-worker | grep -E "(batch|embedding)"job := &EmbeddingJob{
Chunks: repoChunks,
MaxBatchSize: 200, // Balanced for throughput
}job := &EmbeddingJob{
Chunks: migrationChunks,
MaxBatchSize: 500, // Maximum efficiency
}| Strategy | Throughput | Latency | Cost Efficiency |
|---|---|---|---|
| Small Batches | Low | Minimal | Low |
| Medium Batches | Medium | Low | Medium |
| Large Batches | High | Medium | High |
| Maximum Batches | Maximum | High | Maximum |
- Batch Mode: Reduces API calls significantly compared to sequential
- Token Counting: Enables accurate cost prediction and billing
- Rate Limit Awareness: Avoids expensive retry storms
To confirm batch processing is working correctly:
-
Check Configuration:
-
use_test_embeddings: false(for production) batch_processing.enabled: true-
CODECHUNK_GEMINI_API_KEYenvironment variable set
-
-
Monitor Logs:
- Look for "Processing batch embedding results (PRODUCTION MODE)"
- Verify chunk counts in batch submissions
-
Review Metrics:
-
batch_jobs_submittedshould increase -
batch_fallback_countshould remain low
-
See BATCH_JOB_STATUS.md for additional troubleshooting and monitoring guidance.
Configuration
- [📖 Configuration Reference](configuration reference) - Complete reference guide
- Configuration
- API Configuration
- Database Configuration
- Gemini Configuration
- Git Configuration
- Logging Configuration
- Middleware Configuration
- NATS Configuration
- Worker Configuration