Skip to content

Commit 64e90d6

Browse files
[flink] Support cdc source (#6606)
1 parent 87ee257 commit 64e90d6

File tree

36 files changed

+4685
-178
lines changed

36 files changed

+4685
-178
lines changed
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
---
2+
title: "Flink CDC Paimon Pipeline Connector"
3+
weight: 10
4+
type: docs
5+
aliases:
6+
- /cdc-ingestion/flink-cdc-paimon-pipeline-connector.html
7+
---
8+
<!--
9+
Licensed to the Apache Software Foundation (ASF) under one
10+
or more contributor license agreements. See the NOTICE file
11+
distributed with this work for additional information
12+
regarding copyright ownership. The ASF licenses this file
13+
to you under the Apache License, Version 2.0 (the
14+
"License"); you may not use this file except in compliance
15+
with the License. You may obtain a copy of the License at
16+
17+
http://www.apache.org/licenses/LICENSE-2.0
18+
19+
Unless required by applicable law or agreed to in writing,
20+
software distributed under the License is distributed on an
21+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
22+
KIND, either express or implied. See the License for the
23+
specific language governing permissions and limitations
24+
under the License.
25+
-->
26+
27+
# Flink CDC Paimon Pipeline Connector
28+
29+
Flink CDC is a streaming data integration tool for the Flink engine. It allows users to describe their ETL pipeline
30+
logic via YAML elegantly and help users automatically generating customized Flink operators and submitting job.
31+
32+
The Paimon Pipeline connector can be used as both the Data Source or the Data Sink of the Flink CDC pipeline. This
33+
document describes how to set up the Paimon Pipeline connector as the Data Source. If you are interested in using
34+
Paimon as the Data Sink, please refer to Flink CDC's
35+
[Paimon Pipeline Connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/pipeline-connectors/paimon/)
36+
document.
37+
38+
## What can the connector do?
39+
40+
* Synchronizes data from a Paimon warehouse, database or table to an external system supported by Flink CDC
41+
* Synchronizes schema changes
42+
* Automatically discovers newly created tables in the source Paimon warehouse.
43+
44+
How to create Pipeline
45+
----------------
46+
47+
The pipeline for reading data from Paimon and sink to Doris can be defined as follows:
48+
49+
```yaml
50+
source:
51+
type: paimon
52+
name: Paimon Source
53+
database: default
54+
table: test_table
55+
catalog.properties.metastore: filesystem
56+
catalog.properties.warehouse: /path/warehouse
57+
58+
sink:
59+
type: doris
60+
name: Doris Sink
61+
fenodes: 127.0.0.1:8030
62+
username: root
63+
password: pass
64+
65+
pipeline:
66+
name: Paimon to Doris Pipeline
67+
parallelism: 2
68+
```
69+
70+
Pipeline Connector Options
71+
----------------
72+
73+
{{< generated/cdc_configuration >}}
74+
75+
Catalog Options
76+
----------------
77+
78+
Apart from the pipeline connector options described above, in the CDC yaml file you can also configure options that
79+
starts with `catalog.properties.`. For example, `catalog.properties.warehouse` or `catalog.properties.metastore`. Such
80+
options will have their prefix removed and the rest be regarded as catalog options. Please refer to the
81+
[Configurations]({{< ref "maintenance/configurations" >}}) section for catalog options available.
82+
83+
Usage Notes
84+
--------
85+
86+
* Data updates for primary key tables (-U, +U) will be replaced with -D and +I.
87+
* Does not support dropping tables. If you need to drop a table from the Paimon warehouse, please restart the Flink CDC job after performing the drop operation. When the job restarts, it will stop reading data from the dropped table, and the target table in the external system will remain unchanged from its state before the job was stopped.
88+
* Data from the same table will be consumed by the same Flink source subtask. If the amount of data varies significantly across different tables, performance bottlenecks caused by data skew may be observed in Flink CDC jobs.
89+
* If the CDC job has consumed up to the latest snapshot of a table and the next snapshot is not available yet, the monitoring and consumption of this table may be temporarily paused until `continuous.discovery-interval` has passed.
90+
91+
Data Type Mapping
92+
----------------
93+
<div class="wy-table-responsive">
94+
<table class="colwidths-auto docutils">
95+
<thead>
96+
<tr>
97+
<th class="text-left">Paimon type</th>
98+
<th class="text-left">CDC type</th>
99+
<th class="text-left" style="width:60%;">NOTE</th>
100+
</tr>
101+
</thead>
102+
<tbody>
103+
<tr>
104+
<td>TINYINT</td>
105+
<td>TINYINT</td>
106+
<td></td>
107+
</tr>
108+
<tr>
109+
<td>SMALLINT</td>
110+
<td>SMALLINT</td>
111+
<td></td>
112+
</tr>
113+
<tr>
114+
<td>INT</td>
115+
<td>INT</td>
116+
<td></td>
117+
</tr>
118+
<tr>
119+
<td>BIGINT</td>
120+
<td>BIGINT</td>
121+
<td></td>
122+
</tr>
123+
<tr>
124+
<td>FLOAT</td>
125+
<td>FLOAT</td>
126+
<td></td>
127+
</tr>
128+
<tr>
129+
<td>DOUBLE</td>
130+
<td>DOUBLE</td>
131+
<td></td>
132+
</tr>
133+
<tr>
134+
<td>DECIMAL(p, s)</td>
135+
<td>DECIMAL(p, s)</td>
136+
<td></td>
137+
</tr>
138+
<tr>
139+
<td>BOOLEAN</td>
140+
<td>BOOLEAN</td>
141+
<td></td>
142+
</tr>
143+
<tr>
144+
<td>DATE</td>
145+
<td>DATE</td>
146+
<td></td>
147+
</tr>
148+
<tr>
149+
<td>TIMESTAMP</td>
150+
<td>TIMESTAMP</td>
151+
<td></td>
152+
</tr>
153+
<tr>
154+
<td>TIMESTAMP_LTZ</td>
155+
<td>TIMESTAMP_LTZ</td>
156+
<td></td>
157+
</tr>
158+
<tr>
159+
<td>CHAR(n)</td>
160+
<td>CHAR(n)</td>
161+
<td></td>
162+
</tr>
163+
<tr>
164+
<td>VARCHAR(n)</td>
165+
<td>VARCHAR(n)</td>
166+
<td></td>
167+
</tr>
168+
</tbody>
169+
</table>
170+
</div>
171+
172+
{{< top >}}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
{{/*
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
*/}}
19+
<table class="configuration table table-bordered">
20+
<thead>
21+
<tr>
22+
<th class="text-left" style="width: 20%">Key</th>
23+
<th class="text-left" style="width: 15%">Default</th>
24+
<th class="text-left" style="width: 10%">Type</th>
25+
<th class="text-left" style="width: 55%">Description</th>
26+
</tr>
27+
</thead>
28+
<tbody>
29+
<tr>
30+
<td><h5>database</h5></td>
31+
<td style="word-wrap: break-word;">(none)</td>
32+
<td>String</td>
33+
<td>Name of the database to be scanned. By default, all databases will be scanned.</td>
34+
</tr>
35+
<tr>
36+
<td><h5>table</h5></td>
37+
<td style="word-wrap: break-word;">(none)</td>
38+
<td>String</td>
39+
<td>Name of the table to be scanned. By default, all tables will be scanned.</td>
40+
</tr>
41+
<tr>
42+
<td><h5>table.discovery-interval</h5></td>
43+
<td style="word-wrap: break-word;">1 min</td>
44+
<td>Duration</td>
45+
<td>The discovery interval of new tables. Only effective when database or table is not set.</td>
46+
</tr>
47+
</tbody>
48+
</table>

paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class ConfigOptionsDocGenerator {
8383
"paimon-flink/paimon-flink-common", "org.apache.paimon.flink"),
8484
new OptionsClassLocation(
8585
"paimon-flink/paimon-flink-cdc", "org.apache.paimon.flink.kafka"),
86+
new OptionsClassLocation(
87+
"paimon-flink/paimon-flink-cdc", "org.apache.paimon.flink.pipeline.cdc"),
8688
new OptionsClassLocation(
8789
"paimon-hive/paimon-hive-catalog", "org.apache.paimon.hive"),
8890
new OptionsClassLocation(

paimon-flink/paimon-flink-cdc/pom.xml

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,31 @@ under the License.
9292

9393
<!-- CDC dependencies -->
9494

95+
<dependency>
96+
<groupId>org.apache.flink</groupId>
97+
<artifactId>flink-cdc-common</artifactId>
98+
<version>${flink.cdc.version}</version>
99+
</dependency>
100+
101+
<dependency>
102+
<groupId>org.apache.flink</groupId>
103+
<artifactId>flink-cdc-runtime</artifactId>
104+
<version>${flink.cdc.version}</version>
105+
<exclusions>
106+
<exclusion>
107+
<groupId>org.apache.calcite</groupId>
108+
<artifactId>calcite-core</artifactId>
109+
</exclusion>
110+
</exclusions>
111+
</dependency>
112+
113+
<dependency>
114+
<groupId>org.apache.flink</groupId>
115+
<artifactId>flink-connector-files</artifactId>
116+
<version>${flink.version}</version>
117+
<scope>provided</scope>
118+
</dependency>
119+
95120
<dependency>
96121
<groupId>org.apache.flink</groupId>
97122
<artifactId>flink-connector-postgres-cdc</artifactId>
@@ -220,13 +245,6 @@ under the License.
220245
<type>test-jar</type>
221246
</dependency>
222247

223-
<dependency>
224-
<groupId>org.apache.flink</groupId>
225-
<artifactId>flink-connector-files</artifactId>
226-
<version>${flink.version}</version>
227-
<scope>test</scope>
228-
</dependency>
229-
230248
<dependency>
231249
<groupId>org.apache.flink</groupId>
232250
<artifactId>flink-json</artifactId>
@@ -307,8 +325,30 @@ under the License.
307325
</exclusion>
308326
</exclusions>
309327
</dependency>
328+
<dependency>
329+
<groupId>org.apache.flink</groupId>
330+
<artifactId>flink-cdc-composer</artifactId>
331+
<version>${flink.cdc.version}</version>
332+
<scope>test</scope>
333+
</dependency>
334+
<dependency>
335+
<groupId>org.apache.flink</groupId>
336+
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
337+
<version>${flink.cdc.version}</version>
338+
<scope>test</scope>
339+
</dependency>
310340
</dependencies>
311341

342+
<!-- <dependencyManagement>-->
343+
<!-- <dependencies>-->
344+
<!-- <dependency>-->
345+
<!-- <groupId>com.google.guava</groupId>-->
346+
<!-- <artifactId>guava</artifactId>-->
347+
<!-- <version>32.1.2-jre</version>-->
348+
<!-- </dependency>-->
349+
<!-- </dependencies>-->
350+
<!-- </dependencyManagement>-->
351+
312352
<build>
313353
<plugins>
314354
<plugin>

0 commit comments

Comments
 (0)