A real-time data pipeline that simulates user activity, streams events via Apache Kafka, and lands them into Apache Iceberg on S3 for analytics. Built with FastAPI, Spark Structured Streaming, and Docker Compose for end-to-end streaming data architecture.
- Simulates realistic e-commerce user events using Faker
- Ingests events via FastAPI into Kafka topics
- Streams and processes data using Spark Structured Streaming
- Stores data in Apache Iceberg with partitioning and schema evolution
- Supports S3-compatible storage (e.g., AWS, MinIO)
- Fully containerized with Docker Compose
- Includes data quality checks and console logging for invalid records
- Schema auto-evolution and fanout write support enabled
- Docker and Docker Compose
- S3-compatible storage (AWS S3 recommended)
- AWS credentials configured (via IAM or environment)
- Python 3.8+ (for local testing)
python-dotenv(if using.envfile)
git clone https://github.com/Treespunking/E-commerce-Real-time-Data-Pipeline.git
cd E-commerce-Real-time-Data-PipelineCreate a .env file with your S3 bucket and AWS credentials:
ICEBERG_S3_BUCKET=your-iceberg-bucket-name
AWS_ACCESS_KEY_ID=your-access-key
AWS_SECRET_ACCESS_KEY=your-secret-keyNever commit this file! It should be in
.gitignore.
docker-compose up --buildThis will:
- Start Zookeeper and Kafka
- Create the
ecommerce_eventstopic - Launch the FastAPI ingestion endpoint
- Submit the Spark job to stream into Iceberg
Once the stack is running, generate sample events:
python event_generator.pyEvents are sent to
http://localhost:8000/eventsand streamed into Kafka → Spark → Iceberg.
[Faker Events]
↓
[FastAPI Server] → POST /events
↓
[Kafka Topic: ecommerce_events]
↓
[Spark Structured Streaming]
↓
[Apache Iceberg Table (S3)]
- Table:
local.db.ecommerce_events - Location:
s3a://<your-bucket>/iceberg-warehouse/db/ecommerce_events - Partitioned by:
event_type,event_date - Checkpointing: Enabled for fault tolerance
Verify Kafka is working:
python test_kafka.pyExpected output:
Success! Connected to Kafka
Topics: ['ecommerce_events', ...]
After streaming events, explore your Iceberg table directly using Spark SQL or connect with tools like:
- AWS Athena (for querying)
- Trino/Presto (for federated queries)
- Jupyter Notebook with PySpark
Example query in Spark:
SELECT event_type, COUNT(*) FROM local.db.ecommerce_events GROUP BY event_type;pipeline/
│
├── docker-compose.yml # Orchestration of all services
├── .env # Environment variables (S3 bucket, AWS keys)
│
├── api/
│ ├── main.py # FastAPI endpoint to ingest events
│ ├── requirements.txt # FastAPI & Kafka deps
│ └── Dockerfile # API container
│
├── spark_streaming.py # Spark job: reads Kafka, writes to Iceberg
├── event_generator.py # Generates fake e-commerce events
├── test_kafka.py # Test script for Kafka connectivity
│
└── spark-apps/ # Mounted volume for Spark jobs
└── spark_streaming.py
- Schema Evolution: Enabled via
spark.sql.iceberg.schema.auto.add.columns - Fanout Writes: Enabled to support multiple writers
- Checkpointing: Ensures exactly-once processing
- PERMISSIVE JSON Parsing: Invalid fields are set to
nullinstead of failing - S3 Integration: Uses
hadoop-awsandS3AFileSystem