Skip to content

Conversation

@liumingjian
Copy link

Summary

This PR adds a complete GaussDB CDC connector implementation with support for snapshot and streaming modes using GaussDB's mppdb_decoding logical replication plugin.

Key Features

  • ✅ Full snapshot support for initial data capture
  • ✅ Streaming CDC using GaussDB logical replication (mppdb_decoding)
  • ✅ Support for all common GaussDB data types
  • ✅ Configurable connection pooling and retry mechanisms
  • ✅ Comprehensive test suite

Critical Bug Fixes

1. Default Value Converter Issue

  • Problem: GaussDB returns function calls (e.g., pg_systimestamp(), CURRENT_TIMESTAMP) as default values, causing Debezium to fail when trying to use them as actual values
  • Solution: Created GaussDBDefaultValueConverter that properly handles function-based defaults by returning Optional.empty() for them
  • Impact: Fixes schema building errors that prevented connector initialization

2. Missing Source Info Fields

  • Problem: Debezium envelope requires multiple fields (version, connector, name, snapshot) in the source struct, but they were not being set, causing validation errors
  • Solution: Added all required source info fields to both snapshot and streaming source struct builders
  • Impact: Fixes runtime errors during data capture

Implementation Details

Core Components

  • GaussDBSource: Main source implementation extending IncrementalSource
  • GaussDBDialect: Dialect for GaussDB-specific SQL and behavior
  • GaussDBConnection: Connection management with retry logic
  • GaussDBReplicationConnection: Logical replication connection handling
  • GaussDBScanFetchTask: Snapshot data reading with JDBC
  • GaussDBStreamFetchTask: Streaming CDC data reading via logical replication

Configuration Options

  • Hostname, port, database, username, password
  • Plugin name (mppdb_decoding)
  • Slot name for logical replication
  • Connection timeout and retry settings
  • Snapshot fetch size
  • Table include/exclude patterns

Testing

  • ✅ Unit tests for all major components
  • ✅ Integration tests for snapshot and streaming modes
  • ✅ Data type compatibility tests
  • ✅ Boundary condition tests
  • ⚠️ One integration test has timeout issue (under investigation)

Verified Configuration

  • ✅ GaussDB wal_level = logical (required for CDC)
  • mppdb_decoding plugin available and functional
  • ✅ Replication slot creation and management working

Known Issues

  • Integration test testReadSingleTableAllRecords times out (fetch task execution issue under investigation)
  • This appears to be a Flink job initialization issue rather than a data reading problem
  • All schema/envelope validation errors have been resolved

Test Plan

  • Unit tests pass
  • Code formatting (spotless) passes
  • Integration test investigation ongoing
  • Manual testing with real GaussDB instance successful

Dependencies

  • GaussDB JDBC driver (included in lib/)
  • Debezium PostgreSQL connector (for replication protocol compatibility)
  • Flink CDC base framework

Documentation

  • README with usage examples
  • Docker Compose setup for local testing
  • Troubleshooting guide
  • Connectivity diagnosis guide

Checklist

  • Code follows project style guidelines
  • Added comprehensive tests
  • Added documentation
  • Fixed critical bugs (default value converter, source info fields)
  • All tests passing (1 integration test timeout under investigation)

🤖 Generated with Claude Code

Co-Authored-By: Claude Sonnet 4.5 [email protected]

## Summary
Add complete GaussDB CDC connector implementation with support for snapshot and streaming modes using GaussDB's mppdb_decoding logical replication plugin.

## Key Features
- Full snapshot support for initial data capture
- Streaming CDC using GaussDB logical replication
- Support for all common GaussDB data types
- Configurable connection pooling and retry mechanisms
- Comprehensive test suite

## Critical Bug Fixes

### 1. Default Value Converter Issue
- **Problem**: GaussDB returns function calls (e.g., `pg_systimestamp()`, `CURRENT_TIMESTAMP`) as default values, causing Debezium to fail when trying to use them as actual values
- **Solution**: Created `GaussDBDefaultValueConverter` that properly handles function-based defaults by returning `Optional.empty()` for them
- **Files**:
  - `io/debezium/connector/gaussdb/GaussDBDefaultValueConverter.java` (new)
  - `io/debezium/connector/gaussdb/GaussDBSchema.java` (modified)

### 2. Missing Source Info Fields
- **Problem**: Debezium envelope requires multiple fields (version, connector, name, snapshot) in the source struct, but they were not being set, causing validation errors
- **Solution**: Added all required source info fields to both snapshot and streaming source struct builders
- **Files**:
  - `GaussDBScanFetchTask.java:294-339` (snapshot)
  - `GaussDBStreamFetchTask.java:495-551` (streaming)

## Implementation Details

### Core Components
- **GaussDBSource**: Main source implementation
- **GaussDBDialect**: Dialect for GaussDB-specific SQL and behavior
- **GaussDBConnection**: Connection management with retry logic
- **GaussDBReplicationConnection**: Logical replication connection handling
- **GaussDBScanFetchTask**: Snapshot data reading
- **GaussDBStreamFetchTask**: Streaming CDC data reading

### Configuration
- Hostname, port, database, username, password
- Plugin name (mppdb_decoding)
- Slot name for logical replication
- Connection timeout and retry settings
- Snapshot fetch size

### Testing
- Unit tests for all major components
- Integration tests for snapshot and streaming modes
- Data type compatibility tests
- Boundary condition tests

## Verified Configuration
- ✅ GaussDB `wal_level = logical` (required for CDC)
- ✅ `mppdb_decoding` plugin available and functional
- ✅ Replication slot creation and management working

## Known Issues
- Integration test timeout issue under investigation (fetch task execution)
- Requires further debugging of Flink job initialization flow

## Dependencies
- GaussDB JDBC driver (included in lib/)
- Debezium PostgreSQL connector (for replication protocol)
- Flink CDC base framework

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
- Add SERIAL/BIGSERIAL support in GaussDBTypeUtils
- Bundle PostgreSQL driver in flink-sql-connector-gaussdb-cdc to avoid NoClassDefFoundError
- Implement GaussDbE2eITCase with robust replication slot cleanup
- Improve replication connection stability with KeepAlive and socket timeout settings
- Add diagnostic logging for GaussDB replication stream
…eption

- Add getSlotNameForBackfillTask() to GaussDBSourceConfig returning slotName_backfill
- Add backfillReplicationConnection field to GaussDBSourceFetchTaskContext
- Add isBoundedRead() method to detect bounded reads (SnapshotSplit or StreamSplit with endOffset)
- Modify createReplicationStream() to use backfill connection for bounded reads
- Add endingPos field and reachEnd() check for bounded stream reads
- Add 10ms stabilization delay after stream.start()

This fixes the replication slot contention issue where GaussDB would fail with
EOFException when trying to restart the replication stream during backfill phase.

Tested E2E:
- Snapshot: 2 records captured successfully
- INSERT: id=103 captured and synced to MySQL
- UPDATE: id=101 value change captured and synced
… support

- Resolved TableId catalog mismatch in GaussDBSourceFetchTaskContext
- Implemented robust polling mechanism in E2E tests for reliable verification
- Added native UTF-8 encoding support for Chinese character synchronization
- Enhanced documentation including READMEs and setup guides
- Cleaned up duplicate JAR conflicts in deployment process
@github-actions github-actions bot added docs Improvements or additions to documentation base labels Dec 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

base debezium docs Improvements or additions to documentation e2e-tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant