Kafnus Connect is the persistence layer of the Kafnus ecosystem — a modern, Kafka-based replacement for Cygnus in FIWARE smart city environments.
It provides ready-to-use Kafka Connect images with custom Single Message Transforms (SMTs) and pre-integrated sink connectors for PostGIS, MongoDB, and HTTP endpoints.
Kafnus Connect consumes processed NGSI events from Kafka topics (produced by Kafnus NGSI) and persists them into target datastores or APIs.
- 🗺️ PostGIS (via custom JDBC connector)
- Forked and extended to handle GeoJSON geometries and NGSI-specific data structures.
- 📦 MongoDB
- Official MongoDB Kafka connector for JSON document storage.
- 🌐 HTTP
- Aiven Open HTTP Connector for forwarding events to REST endpoints.
- Forked to handle 200 responses with errors
Kafka (processed topics)
│
▼
Kafnus Connect (Kafka Connect)
├─ JDBC Sink (PostGIS)
├─ MongoDB Sink
└─ HTTP Sink
Each connector can be independently configured via environment variables or connect-distributed.properties.
Custom SMTs can be chained to transform headers or message formats before persistence.
docker build -t telefonicaiot/kafnus-connect:latest .docker run -d --name kafnus-connect -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 -e CONNECT_GROUP_ID=kafnus-connect -e CONNECT_CONFIG_STORAGE_TOPIC=connect-configs -e CONNECT_OFFSET_STORAGE_TOPIC=connect-offsets -e CONNECT_STATUS_STORAGE_TOPIC=connect-status telefonicaiot/kafnus-connect:latestFor complete examples, see the
tests_end2endfolder in the main Kafnus repository.
Integration and end-to-end testing are performed from the Kafnus NGSI repository, where complete data flow scenarios are executed using Testcontainers.
- Custom SMTs are available in
src/header-router/. - New sinks can be added by extending the base image and adding plugins under
/usr/share/java/. - Monitoring via Prometheus JMX Exporter is supported out of the box.
For deeper technical details about how Kafnus Connect is configured, built, and extended — including:
- Environment setup and logging configuration
- Plugin management and sink registration
- Supported sinks and custom SMTs
- Usage of EnvVarConfigProvider for connector configuration
👉 See Technical Configuration Guide
- Kafnus ecosystem overview
- PostGIS connector fork
- MongoDB connector docs
- Aiven HTTP Connector
- Aiven HTTP Connector forked
🧭 Project structure note
This repository is part of the Kafnus ecosystem: