Skip to content

Commit 108225d

Browse files
authored
Add Iceberg CDC support to YAML (apache#36641)
* Add Iceberg CDC support to YAML and Blueprints * Fix Lint * Add Filters to integration test * Fix Mock Tests * Remove Iceberg Blueprints from Beam Repo * Remove mock tests * Adding timestamps * Add Streaming test
1 parent 614e742 commit 108225d

File tree

2 files changed

+81
-1
lines changed

2 files changed

+81
-1
lines changed

sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,56 @@ pipelines:
6060
- {label: "389a", rank: 2}
6161
options:
6262
project: "apache-beam-testing"
63-
temp_location: "{TEMP_DIR}"
63+
temp_location: "{TEMP_DIR}"
64+
65+
- name: read_cdc_batch
66+
pipeline:
67+
type: chain
68+
transforms:
69+
- type: ReadFromIcebergCDC
70+
config:
71+
table: db.labels
72+
catalog_name: hadoop_catalog
73+
catalog_properties:
74+
type: hadoop
75+
warehouse: "{TEMP_DIR}"
76+
from_timestamp: 1762819200000
77+
to_timestamp: 2078352000000
78+
filter: '"label" = ''11a'' or "rank" = 1'
79+
keep:
80+
- label
81+
- rank
82+
- type: AssertEqual
83+
config:
84+
elements:
85+
- {label: "11a", rank: 0}
86+
- {label: "37a", rank: 1}
87+
options:
88+
project: "apache-beam-testing"
89+
temp_location: "{TEMP_DIR}"
90+
91+
- name: read_cdc_streaming
92+
pipeline:
93+
type: chain
94+
transforms:
95+
- type: ReadFromIcebergCDC
96+
config:
97+
table: db.labels
98+
catalog_name: hadoop_catalog
99+
catalog_properties:
100+
type: hadoop
101+
warehouse: "{TEMP_DIR}"
102+
streaming: True
103+
to_timestamp: 2078352000000
104+
filter: '"label" = ''11a'' or "rank" = 1'
105+
keep:
106+
- label
107+
- rank
108+
- type: AssertEqual
109+
config:
110+
elements:
111+
- {label: "11a", rank: 0}
112+
- {label: "37a", rank: 1}
113+
options:
114+
project: "apache-beam-testing"
115+
temp_location: "{TEMP_DIR}"

sdks/python/apache_beam/yaml/standard_io.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,31 @@
403403
'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1'
404404
config:
405405
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'
406+
407+
#IcebergCDC
408+
- type: renaming
409+
transforms:
410+
'ReadFromIcebergCDC': 'ReadFromIcebergCDC'
411+
config:
412+
mappings:
413+
'ReadFromIcebergCDC':
414+
table: 'table'
415+
catalog_name: 'catalog_name'
416+
catalog_properties: 'catalog_properties'
417+
config_properties: 'config_properties'
418+
drop: 'drop'
419+
filter: 'filter'
420+
from_snapshot: 'from_snapshot'
421+
from_timestamp: 'from_timestamp'
422+
keep: 'keep'
423+
poll_interval_seconds: 'poll_interval_seconds'
424+
starting_strategy: 'starting_strategy'
425+
streaming: 'streaming'
426+
to_snapshot: 'to_snapshot'
427+
to_timestamp: 'to_timestamp'
428+
underlying_provider:
429+
type: beamJar
430+
transforms:
431+
'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
432+
config:
433+
gradle_target: 'sdks:java:io:expansion-service:shadowJar'

0 commit comments

Comments
 (0)