Skip to content

Conversation

@xxntti3n
Copy link

@xxntti3n xxntti3n commented Sep 9, 2025

This PR adds support for per-table chunk key column configuration in the PostgreSQL CDC connector, enabling fine-grained control over incremental snapshot
chunking. Previously, all tables shared the same chunk key column, which was inefficient for heterogeneous table schemas.

Key Changes

  • ✨ Added chunk key column parsing logic to PostgreSQL DataSourceFactory
  • 🔧 Implemented comma-separated configuration format support

Example Usage

sourceConf:
scan.incremental.snapshot.chunk.key-column: public.action_logs:created_at,public.service_logs:created_at

@lvyanquan
Copy link
Contributor

Please add more description about this change.

@Mrart
Copy link
Contributor

Mrart commented Sep 19, 2025

@xxntti3n I'm trying to understand what you want to do, do you want to configure a separate chunk key column for schema.table? What's wrong with the previous chunkkeycolumn configuration?

@xxntti3n xxntti3n force-pushed the universal-chunk-column-dev branch 4 times, most recently from 5314014 to 4fc6319 Compare September 23, 2025 16:45
@xxntti3n
Copy link
Author

xxntti3n commented Sep 25, 2025

@xxntti3n I'm trying to understand what you want to do, do you want to configure a separate chunk key column for schema.table? What's wrong with the previous chunkkeycolumn configuration?

yes i want to conf with multiple tables like mysql dialect . Example Conf : scan.incremental.snapshot.chunk.key-column: public.table1:created_at,public.table2:created_at . Currently, Postgres only support 1 table

@xxntti3n xxntti3n force-pushed the universal-chunk-column-dev branch from 2742ce7 to 956c4e0 Compare September 25, 2025 07:44
@github-actions github-actions bot added the docs Improvements or additions to documentation label Sep 25, 2025
@xxntti3n
Copy link
Author

Hi @lvyanquan, @Mrart - I've added the PR description and would appreciate your review. Looking forward to your feedback. Thanks!

@xxntti3n xxntti3n changed the title add handle chunkKeyColumn for Postgres Implement comma-separated parsing for chunk key columns Sep 25, 2025
new PostgresIncrementalSource<>(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);

return source;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this, and return is not good?

@Internal
public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a place to call this LOG?


if (tableName != null && !tableName.equalsIgnoreCase(table)) {
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extract the test functions and extract the test functions?

if (keyStruct != null && keyStruct.schema().field(splitFieldName) != null) {
return new Object[] {keyStruct.get(splitFieldName)};
}
LOG.info("Get Split Key From Value {} {}", dataRecord, splitFieldName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this place need debug logs?

Struct value = (Struct) dataRecord.value();
if (value == null) {
return null; // No value struct available
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataRecord may not be null?


private int lsnCommitCheckpointsDelay;

private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ConcurrentHashMap more appropriate?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

base docs Improvements or additions to documentation postgres-cdc-connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants