-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Implement comma-separated parsing for chunk key columns #4115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Please add more description about this change. |
055ec61 to
d8d2087
Compare
|
@xxntti3n I'm trying to understand what you want to do, do you want to configure a separate chunk key column for schema.table? What's wrong with the previous chunkkeycolumn configuration? |
5314014 to
4fc6319
Compare
yes i want to conf with multiple tables like mysql dialect . Example Conf : scan.incremental.snapshot.chunk.key-column: public.table1:created_at,public.table2:created_at . Currently, Postgres only support 1 table |
bae9fcf to
c76cf80
Compare
2f65cc3 to
2742ce7
Compare
2742ce7 to
956c4e0
Compare
|
Hi @lvyanquan, @Mrart - I've added the PR description and would appreciate your review. Looking forward to your feedback. Thanks! |
| new PostgresIncrementalSource<>( | ||
| configFactory, checkNotNull(deserializer), offsetFactory, dialect); | ||
|
|
||
| return source; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to change this, and return is not good?
| @Internal | ||
| public abstract class JdbcSourceFetchTaskContext implements FetchTask.Context { | ||
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceFetchTaskContext.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a place to call this LOG?
|
|
||
| if (tableName != null && !tableName.equalsIgnoreCase(table)) { | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can extract the test functions and extract the test functions?
| if (keyStruct != null && keyStruct.schema().field(splitFieldName) != null) { | ||
| return new Object[] {keyStruct.get(splitFieldName)}; | ||
| } | ||
| LOG.info("Get Split Key From Value {} {}", dataRecord, splitFieldName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this place need debug logs?
| Struct value = (Struct) dataRecord.value(); | ||
| if (value == null) { | ||
| return null; // No value struct available | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dataRecord may not be null?
|
|
||
| private int lsnCommitCheckpointsDelay; | ||
|
|
||
| private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is ConcurrentHashMap more appropriate?
This PR adds support for per-table chunk key column configuration in the PostgreSQL CDC connector, enabling fine-grained control over incremental snapshot
chunking. Previously, all tables shared the same chunk key column, which was inefficient for heterogeneous table schemas.
Key Changes
Example Usage
sourceConf:
scan.incremental.snapshot.chunk.key-column: public.action_logs:created_at,public.service_logs:created_at