Real-time Change Data Capture to Vector Database Pipeline
CDC2Vec is a high-performance, real-time pipeline that captures database changes using Change Data Capture (CDC) and automatically converts them into vector embeddings for storage in vector databases. Perfect for building real-time semantic search, RAG systems, and AI-powered applications.
This project is currently in active development and is NOT production-ready.
- DO NOT use with production databases that contain critical business data
- DO NOT use with databases where data loss would be catastrophic
- Test thoroughly in isolated environments before any production deployment
- Monitor closely during initial deployments
- Backup your data before testing with any database
- Development and testing environments only
- Non-critical databases for experimentation
- Staging environments with test data
- Isolated Docker containers for learning
Use at your own risk. The authors are not responsible for any data loss or system issues.
- Real-time CDC: Captures database changes instantly using PostgreSQL logical replication
- Automatic Vectorization: Converts text data to embeddings using various providers
- Multiple Sinks: Support for Qdrant, Milvus (in progress), and Kafka
- Batch Processing: Efficient batching for high-throughput scenarios
- Health Monitoring: Built-in health checks and metrics
- Docker Ready: Containerized deployment with GitHub Container Registry
| Feature | Status | Description |
|---|---|---|
| Sources | ||
| PostgreSQL CDC | ✅ Completed | Logical replication support with automatic slot/publication management |
| MySQL CDC | 📋 Planned | Binlog-based change data capture |
| MongoDB CDC | 📋 Planned | Change streams support |
| Embedding Providers | ||
| Ollama HTTP | ✅ Completed | Local embedding models via Ollama |
| OpenAI API | 🚧 In Progress | OpenAI embedding models (text-embedding-ada-002, etc.) |
| Hugging Face | 📋 Planned | Direct model inference and API support |
| Cohere | 📋 Planned | Cohere embedding API integration |
| Vector Databases | ||
| Qdrant | ✅ Completed | Full HTTP API support with collection management |
| Milvus | 🚧 In Progress | Scalable vector database support |
| Pinecone | 📋 Planned | Managed vector database service |
| Weaviate | 📋 Planned | GraphQL-based vector database |
| Chroma | 📋 Planned | Open-source embedding database |
| Streaming & Messaging | ||
| Kafka | ✅ Completed | Stream vectors to Kafka topics |
| Redis Streams | 📋 Planned | Redis-based streaming |
| Apache Pulsar | 📋 Planned | Distributed messaging system |
| Operations & Monitoring | ||
| Docker Support | ✅ Completed | Full containerization with multi-stage builds |
| Health Checks | ✅ Completed | HTTP endpoints for monitoring |
| Structured Logging | ✅ Completed | JSON logging with configurable levels |
| Prometheus Metrics | 📋 Planned | Detailed performance and business metrics |
| Grafana Dashboard | 📋 Planned | Pre-built monitoring dashboards |
| Deployment & Scaling | ||
| Kubernetes Manifests | 📋 Planned | Production-ready K8s deployments |
| Helm Charts | 📋 Planned | Parameterized Kubernetes deployments |
| Horizontal Scaling | 📋 Planned | Multi-instance coordination |
- ✅ Completed: Feature is implemented and tested
- 🚧 In Progress: Currently under development
- 📋 Planned: Scheduled for future development
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ PostgreSQL │───▶│ CDC2Vec │───▶│ Embedding │───▶│ Vector │
│ (Source) │ │ Pipeline │ │ Provider │ │ Database │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Health │
│ Endpoint │
└─────────────┘
⚠️ Before you start: This is a development project. Only use with test databases or isolated environments. Never use with production data without thorough testing.
- Pull the Docker image:
docker pull ghcr.io/mehmetymw/cdc2vec:latest- Create a configuration file:
curl -o config.yaml https://raw.githubusercontent.com/mehmetymw/cdc2vec/master/configs/postgres-qdrant.yaml
- Edit the configuration:
source:
type: "postgres"
postgres:
dsn: "postgres://username:password@your-postgres:5432/database?sslmode=disable"
slot: "cdc2vec_slot"
publication: "cdc2vec_pub"
create_publication: true
create_slot: true
tables: ["public.documents"]
offset_store: "/data/offsets"
embed:
provider: "ollama_http"
model: "nomic-embed-text"
url: "http://your-ollama:11434"
normalize: true
sink:
type: "qdrant"
qdrant:
addr: "http://your-qdrant:6333"
collection: "documents"
distance: "Cosine"
mapping:
- table: "public.documents"
id_column: "id"
text_columns: ["title", "content"]
metadata_columns: ["created_at", "author"]
batching:
batch_size: 64
flush_interval_ms: 1000
http:
addr: ":8080"- Run the container:
docker run -d \
--name cdc2vec \
-p 8080:8080 \
-v $(pwd)/config.yaml:/config.yaml \
-v $(pwd)/offsets:/data/offsets \
-e CONFIG_PATH=/config.yaml \
ghcr.io/mehmetymw/cdc2vec:latestversion: "3.8"
services:
cdc2vec:
image: ghcr.io/mehmetymw/cdc2vec:latest
ports:
- "8080:8080"
volumes:
- ./config.yaml:/config.yaml
- ./offsets:/data/offsets
environment:
- CONFIG_PATH=/config.yaml
depends_on:
- postgres
- qdrant
- ollama
restart: unless-stopped
# Add your PostgreSQL, Qdrant, and Ollama services here- Clone the repository:
git clone https://github.com/mehmetymw/cdc2vec.git
cd cdc2vec- Build the application:
go mod tidy
go build -o bin/cdc2vec ./cmd/cdc2vec- Run with configuration:
export CONFIG_PATH=config.yaml
./bin/cdc2vec
⚠️ Setup Warning: Configure PostgreSQL logical replication only on test/development databases. Production databases require careful planning and testing.
Your PostgreSQL instance must be configured for logical replication:
- Enable logical replication in
postgresql.conf:
wal_level = logical
max_wal_senders = 10
max_replication_slots = 10- Allow replication connections in
pg_hba.conf:
host replication all 0.0.0.0/0 md5
- Create a replication user:
CREATE USER replica WITH REPLICATION PASSWORD 'your_password';
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO replica;
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA public TO replica;- Restart PostgreSQL to apply configuration changes.
docker run -p 6333:6333 qdrant/qdrant:latest# Coming soon# Install Ollama
curl -fsSL https://ollama.ai/install.sh | sh
# Pull embedding model
ollama pull nomic-embed-text# Coming soonsource:
type: "postgres"
postgres:
dsn: "postgres://user:pass@host:port/db?sslmode=disable"
slot: "cdc2vec_slot" # Replication slot name
publication: "cdc2vec_pub" # Publication name
create_publication: true # Auto-create publication
create_slot: true # Auto-create replication slot
tables: ["public.table1"] # Tables to monitor
offset_store: "/data/offsets" # Offset storage directory# Coming soonembed:
provider: "ollama_http"
model: "mxbai-embed-large"
url: "http://localhost:11434"
normalize: true # Normalize vectors
vector_size: 1024 # Vector dimensionembed:
provider: "openai"
model: "text-embedding-ada-002"
api_key: "your-api-key"
normalize: truesink:
type: "qdrant"
qdrant:
addr: "http://localhost:6333"
collection: "documents"
distance: "Cosine" # Cosine, Dot, Euclidsink:
type: "milvus"
milvus:
addr: "localhost:19530"
collection: "documents"
metric: "COSINE"sink:
type: "kafka"
kafka:
brokers: ["localhost:9092"]
topic: "vectors"mapping:
- table: "public.documents" # Source table
id_column: "id" # Primary key column
text_columns: ["title", "content"] # Columns to embed
metadata_columns: ["author", "created_at"] # Metadata to storebatching:
batch_size: 64 # Batch size for processing
flush_interval_ms: 1000 # Max wait time for batchCDC2Vec provides a health endpoint at /healthz:
curl http://localhost:8080/healthzResponse:
{
"status": "running",
"last_offset": "0/1234567",
"batch_size": 5,
"timestamp": "2024-01-15T10:30:00Z"
}CDC2Vec uses structured logging with different levels:
DEBUG: Detailed processing informationINFO: General operational informationWARN: Warning conditionsERROR: Error conditions
go test -v ./...docker build -t cdc2vec .- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
CDC2Vec is designed for high-performance scenarios:
- Throughput: Processes thousands of changes per second
- Latency: Sub-second change detection and processing
- Memory: Efficient batching minimizes memory usage
- Scalability: Horizontal scaling through multiple instances
Performance metrics from real benchmark tests on AMD Ryzen 9 9950X (32 cores):
| Text Size | Avg Latency | Throughput | Memory/Op | Allocs/Op |
|---|---|---|---|---|
| Small (~20 chars) | 212μs | ~4,700 ops/sec | 22.5KB | 127 |
| Medium (~250 chars) | 231μs | ~4,300 ops/sec | 23.0KB | 128 |
| Large (~2.5KB) | 292μs | ~3,400 ops/sec | 40.0KB | 131 |
| Component | Performance | Notes |
|---|---|---|
| PostgreSQL CDC | ~1,000-5,000 changes/sec | Depends on WAL volume and network latency |
| Vector Storage | ~500-2,000 inserts/sec | Varies by vector dimension and batch size |
| Memory Usage | 50-200MB | Base usage, scales with batch size |
| CPU Usage | 10-30% | Single core, embedding generation dominant |
Benchmarks measured with Ollama HTTP provider, results may vary with different embedding models and hardware.
# Connect to PostgreSQL and drop the slot
SELECT pg_drop_replication_slot('cdc2vec_slot');# Grant necessary permissions
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO your_user;- Check if PostgreSQL allows replication connections
- Verify
pg_hba.confconfiguration - Ensure firewall allows connections
Enable debug logging by setting log level:
# Add to your config or set environment variable
LOG_LEVEL: "DEBUG"Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
This software is provided "as is" without warranty of any kind.
- The authors and contributors are not responsible for any data loss, system damage, or business interruption
- Users are responsible for testing the software thoroughly before any production use
- This project is in active development and may contain bugs or incomplete features
- Always backup your data before testing any CDC/vector database tools
- Use only in development, testing, or staging environments until the project reaches production-ready status
By using this software, you acknowledge that you understand the risks and accept full responsibility for any consequences.