-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Add Iceberg CDC support to YAML #36641
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
a9c8ac9
38fe11f
e32ed12
3b44c3a
e0f8a19
8768fe8
7cb516b
7efe087
4ee32d2
216afae
bb0e4bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -196,6 +196,23 @@ def shipments_data(): | |
| }] | ||
|
|
||
|
|
||
| def shipment_data_filtered(): | ||
| return [{ | ||
| 'customer_id': 'C1', | ||
| 'shipment_date': '2023-05-01', | ||
| 'shipment_cost': 150.0, | ||
| 'customer_name': 'Alice', | ||
| 'customer_email': '[email protected]' | ||
| }, | ||
| { | ||
| 'customer_id': 'C1', | ||
| 'shipment_date': '2023-05-10', | ||
| 'shipment_cost': 20.0, | ||
| 'customer_name': 'Alice', | ||
| 'customer_email': '[email protected]' | ||
| }] | ||
|
|
||
|
|
||
| def bigtable_data(): | ||
| return [{ | ||
| 'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2' | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| # coding=utf-8 | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| # A pipeline that reads records from a Medallion Bronze Iceberg dataset | ||
| # in a batch manner and writes to a Silver Iceberg dataset. | ||
|
|
||
| pipeline: | ||
| type: chain | ||
| transforms: | ||
| # Step 1: Read Records from Iceberg Table within a time range | ||
| - type: ReadFromIcebergCDC | ||
| name: ReadFromBronzeDataset | ||
| config: | ||
| table: "shipment_dataset_bronze.shipments" | ||
| catalog_name: "shipment_data" | ||
| catalog_properties: | ||
| type: "rest" | ||
| uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" | ||
| warehouse: "gs://biglake_shipment_data" | ||
| header.x-goog-user-project: "apache-beam-testing" | ||
| rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" | ||
| rest-metrics-reporting-enabled: "false" | ||
| filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' | ||
| keep: | ||
| - "customer_id" | ||
| - "shipment_date" | ||
| - "shipment_cost" | ||
| - "customer_name" | ||
| - "customer_email" | ||
|
|
||
| # Step 2: Write records out to Iceberg | ||
| - type: WriteToIceberg | ||
| name: WriteToSilverDataset | ||
| config: | ||
| table: "shipment_dataset_silver.shipments" | ||
| catalog_name: "shipment_data" | ||
| catalog_properties: | ||
| type: "rest" | ||
| uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" | ||
| warehouse: "gs://biglake_shipment_data" | ||
| header.x-goog-user-project: "apache-beam-testing" | ||
| rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" | ||
| rest-metrics-reporting-enabled: "false" | ||
|
|
||
| # Expected: | ||
| # Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]') | ||
| # Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]') |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| # coding=utf-8 | ||
| # | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| # | ||
|
|
||
| # A pipeline that reads Append CDC events from Medallion Bronze Iceberg dataset | ||
| # and writes to Silver Iceberg dataset. | ||
|
|
||
| pipeline: | ||
| type: chain | ||
| transforms: | ||
| # Step 1: Read CDC Append Records from Iceberg Table | ||
| - type: ReadFromIcebergCDC | ||
| name: ReadFromBronzeDataset | ||
| config: | ||
| table: "shipment_dataset_bronze.shipments" | ||
| catalog_name: "shipment_data" | ||
| catalog_properties: | ||
| type: "rest" | ||
| uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" | ||
| warehouse: "gs://biglake_shipment_data" | ||
| header.x-goog-user-project: "apache-beam-testing" | ||
| rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" | ||
| rest-metrics-reporting-enabled: "false" | ||
| poll_interval_seconds: 30 | ||
| streaming: true | ||
| filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' | ||
| keep: | ||
| - "customer_id" | ||
| - "shipment_date" | ||
| - "shipment_cost" | ||
| - "customer_name" | ||
| - "customer_email" | ||
|
|
||
| # Step 2: Write records out to Iceberg | ||
| - type: WriteToIceberg | ||
| name: WriteToSilverDataset | ||
| config: | ||
| table: "shipment_dataset_silver.shipments" | ||
| catalog_name: "shipment_data" | ||
| catalog_properties: | ||
| type: "rest" | ||
| uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" | ||
| warehouse: "gs://biglake_shipment_data" | ||
| header.x-goog-user-project: "apache-beam-testing" | ||
| rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" | ||
| rest-metrics-reporting-enabled: "false" | ||
| triggering_frequency_seconds: 30 | ||
|
|
||
| # Expected: | ||
| # Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]') | ||
| # Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]') | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,4 +60,27 @@ pipelines: | |
| - {label: "389a", rank: 2} | ||
| options: | ||
| project: "apache-beam-testing" | ||
| temp_location: "{TEMP_DIR}" | ||
| temp_location: "{TEMP_DIR}" | ||
|
|
||
| - name: read_cdc | ||
| pipeline: | ||
| type: chain | ||
| transforms: | ||
| - type: ReadFromIcebergCDC | ||
| config: | ||
| table: db.labels | ||
| catalog_name: hadoop_catalog | ||
| catalog_properties: | ||
| type: hadoop | ||
| warehouse: "{TEMP_DIR}" | ||
| filter: '"label" = ''11a''' | ||
| keep: | ||
| - label | ||
| - rank | ||
| - type: AssertEqual | ||
| config: | ||
| elements: | ||
| - {label: "11a", rank: 0} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we update the test to include more than one element. For example, a stream of data read for a predefined Iceberg that span multiple snapshots.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a filter on timestamp for this decade :)) also added multiple conditions to get more than one record. Could not do streaming or snapshot filters in the existing setup, Snapshot is catalog level metadata
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update: Adding a streaming pipeline integration test too with timestamp cut off. |
||
| options: | ||
| project: "apache-beam-testing" | ||
| temp_location: "{TEMP_DIR}" | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So seems like these are pure examples currently. Is the plan to parameterize these in the future so that these an be used in Flex template and Job builder blueprints ? If so can we add such parameterization now ?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. I think we need to consider that the golden blueprints that are used in the Dataflow templates repo have to be properly tested:
Ideally, we should probably have:
What do you think about this:
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me. Thanks!
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with these in ideal scenario.But i think we cannot do exact integration tests for these YAML files, especially streaming pipelines which never end. We any way are writing Integration tests in extended_tests which are close to blueprints. May be we can directly add jinja parameterized blueprints directly to blueprints folder ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding more, consider this CDC usecase. First we need to have a iceberg table. So, we cannot just have a raw blueprint that is testable. We need to create them(which we already do in integration tests) or maintain them, still i do not think every blueprint is testable as its own
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's good to consider developing the main pipeline and testing separately.
Main pipeline should be something parameterized that we can use for templates / blueprints. This is our source of truth and contains the main code we need to to keep correct via testing etc. This should not have specific parameters (references to specific databases etc.) unless we want to make that the default for the template/blueprint.
A close enough pipeline that we want to test dynamically derived from above. In some cases this can be the original pipeline (for example, Kafka which can be run for a limited time). But for Pub/Sub we either need to inject mock transforms and/or cancel the pipeline using a runner specific mechanism (for example, Dataflow cancel operation [1]).
[1] https://docs.cloud.google.com/dataflow/docs/guides/stopping-a-pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Chatted with Danny offline and already mentioned to Tarun, but we are moving everything to Dataflow repo as the source of truth to reduce redundancy and complexity.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per discussion i have removed Blueprints in this repo. But left the integration tests. As we define the YAML->SDK mapping in this repo. We should still continue adding these tests. Specific integration tests for blueprints will go to the Templates repo.
@chamikaramj do we have to wait for some beam release to start using the new YAML features added to this repo in the Templates repo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can add CDC support for YAML and tests here.