From a9c8ac93f04b77822e47f93d08f8dbeba298660b Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 27 Oct 2025 16:37:47 -0700 Subject: [PATCH 1/8] Add Iceberg CDC support to YAML and Blueprints --- .../yaml/examples/testing/examples_test.py | 36 +++++++++- .../yaml/examples/testing/input_data.py | 17 +++++ .../blueprint/iceberg_to_iceberg_batch.yaml | 62 ++++++++++++++++++ .../iceberg_to_iceberg_streaming.yaml | 65 +++++++++++++++++++ .../extended_tests/databases/iceberg.yaml | 27 +++++++- sdks/python/apache_beam/yaml/standard_io.yaml | 28 ++++++++ 6 files changed, 233 insertions(+), 2 deletions(-) create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml create mode 100644 sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 4f0516a1ea93..3815bf77bf6d 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -679,7 +679,9 @@ def _kafka_test_preprocessor( 'test_anomaly_scoring_yaml', 'test_wordCountInclude_yaml', 'test_wordCountImport_yaml', - 'test_iceberg_to_alloydb_yaml' + 'test_iceberg_to_alloydb_yaml', + 'test_iceberg_to_iceberg_streaming_yaml', + 'test_iceberg_to_iceberg_batch_yaml' ]) def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): @@ -792,6 +794,37 @@ def _iceberg_io_read_test_preprocessor( return test_spec +@YamlExamplesTestSuite.register_test_preprocessor([ + 'test_iceberg_to_iceberg_streaming_yaml', 'test_iceberg_to_iceberg_batch_yaml']) +def _iceberg_cdc_read_test_preprocessor( + test_spec: dict, expected: List[str], env: TestEnvironment): + """ + Preprocessor for tests that involve reading CDC events from Iceberg. + + This preprocessor replaces any ReadFromIcebergCDC transform with a Create + transform that reads from a predefined in-memory dictionary. This allows + the test to verify the pipeline's correctness without relying on Iceberg + tables stored externally. + """ + if pipeline := test_spec.get('pipeline', None): + for transform in pipeline.get('transforms', []): + if transform.get('type', '') == 'ReadFromIcebergCDC': + config = transform['config'] + db_name, table_name = config['table'].split('.') + + transform['type'] = 'Create' + transform['config'] = { + k: v + for k, v in config.items() if k.startswith('__') + } + transform['config']['elements'] = INPUT_TABLES[( + config['catalog_name'], + db_name, + table_name)] + + return test_spec + + @YamlExamplesTestSuite.register_test_preprocessor([ 'test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml', @@ -1318,6 +1351,7 @@ def _jinja_preprocessor(raw_spec_string: str, test_name: str): ('orders-test', 'order-database', 'orders'): input_data. spanner_orders_data(), ('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(), + ('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data.shipment_data_filtered(), ('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data. bigtable_data(), ('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(), diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py b/sdks/python/apache_beam/yaml/examples/testing/input_data.py index fb468567355d..115fcd5c0748 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py +++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py @@ -196,6 +196,23 @@ def shipments_data(): }] +def shipment_data_filtered(): + return [{ + 'customer_id': 'C1', + 'shipment_date': '2023-05-01', + 'shipment_cost': 150.0, + 'customer_name': 'Alice', + 'customer_email': 'alice@example.com' + }, + { + 'customer_id': 'C1', + 'shipment_date': '2023-05-10', + 'shipment_cost': 20.0, + 'customer_name': 'Alice', + 'customer_email': 'alice@example.com' + }] + + def bigtable_data(): return [{ 'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2' diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml new file mode 100644 index 000000000000..e8520ab88ab8 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml @@ -0,0 +1,62 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A pipeline that reads records from a Medallion Bronze Iceberg dataset +# in a batch manner and writes to a Silver Iceberg dataset. + +pipeline: + type: chain + transforms: + # Step 1: Read Records from Iceberg Table within a time range + - type: ReadFromIcebergCDC + name: ReadFromBronzeDataset + config: + table: "shipment_dataset_bronze.shipments" + catalog_name: "shipment_data" + catalog_properties: + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" + warehouse: "gs://biglake_shipment_data" + header.x-goog-user-project: "apache-beam-testing" + rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" + rest-metrics-reporting-enabled: "false" + filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' + keep: + - "customer_id" + - "shipment_date" + - "shipment_cost" + - "customer_name" + - "customer_email" + + # Step 2: Write records out to Iceberg + - type: WriteToIceberg + name: WriteToSilverDataset + config: + table: "shipment_dataset_silver.shipments" + catalog_name: "shipment_data" + catalog_properties: + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" + warehouse: "gs://biglake_shipment_data" + header.x-goog-user-project: "apache-beam-testing" + rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" + rest-metrics-reporting-enabled: "false" + +# Expected: +# - Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') +# - Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml new file mode 100644 index 000000000000..19ad7b6a4d37 --- /dev/null +++ b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml @@ -0,0 +1,65 @@ +# coding=utf-8 +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# A pipeline that reads Append CDC events from Medallion Bronze Iceberg dataset +# and writes to Silver Iceberg dataset. + +pipeline: + type: chain + transforms: + # Step 1: Read CDC Append Records from Iceberg Table + - type: ReadFromIcebergCDC + name: ReadFromBronzeDataset + config: + table: "shipment_dataset_bronze.shipments" + catalog_name: "shipment_data" + catalog_properties: + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" + warehouse: "gs://biglake_shipment_data" + header.x-goog-user-project: "apache-beam-testing" + rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" + rest-metrics-reporting-enabled: "false" + poll_interval_seconds: 30 + streaming: true + filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' + keep: + - "customer_id" + - "shipment_date" + - "shipment_cost" + - "customer_name" + - "customer_email" + + # Step 2: Write records out to Iceberg + - type: WriteToIceberg + name: WriteToSilverDataset + config: + table: "shipment_dataset_silver.shipments" + catalog_name: "shipment_data" + catalog_properties: + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" + warehouse: "gs://biglake_shipment_data" + header.x-goog-user-project: "apache-beam-testing" + rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" + rest-metrics-reporting-enabled: "false" + triggering_frequency_seconds: 30 + +# Expected: +# - Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') +# - Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index d72688774dae..5b3800af83b1 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -60,4 +60,29 @@ pipelines: - {label: "389a", rank: 2} options: project: "apache-beam-testing" - temp_location: "{TEMP_DIR}" \ No newline at end of file + temp_location: "{TEMP_DIR}" + + + - name: read_cdc + pipeline: + type: chain + transforms: + - type: ReadFromIcebergCDC + config: + table: db.labels + catalog_name: hadoop_catalog + catalog_properties: + type: hadoop + warehouse: "{TEMP_DIR}" + keep: + - label + - rank + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + - {label: "389a", rank: 2} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR}" diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 66f0c124b4cf..458d3d63e436 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -403,3 +403,31 @@ 'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1' config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + +#IcebergCDC +- type: renaming + transforms: + 'ReadFromIcebergCDC': 'ReadFromIcebergCDC' + config: + mappings: + 'ReadFromIcebergCDC': + table: 'table' + catalog_name: 'catalog_name' + catalog_properties: 'catalog_properties' + config_properties: 'config_properties' + drop: 'drop' + filter: 'filter' + from_snapshot: 'from_snapshot' + from_timestamp: 'from_timestamp' + keep: 'keep' + poll_interval_seconds: 'poll_interval_seconds' + starting_strategy: 'starting_strategy' + streaming: 'streaming' + to_snapshot: 'to_snapshot' + to_timestamp: 'to_timestamp' + underlying_provider: + type: beamJar + transforms: + 'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1' + config: + gradle_target: 'sdks:java:io:expansion-service:shadowJar' From 38fe11f3525fe6b16826d9e84842668fa7b25cff Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 28 Oct 2025 08:58:08 -0700 Subject: [PATCH 2/8] Fix Lint --- .../yaml/examples/testing/examples_test.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 3815bf77bf6d..268903d8fcd2 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -795,7 +795,9 @@ def _iceberg_io_read_test_preprocessor( @YamlExamplesTestSuite.register_test_preprocessor([ - 'test_iceberg_to_iceberg_streaming_yaml', 'test_iceberg_to_iceberg_batch_yaml']) + 'test_iceberg_to_iceberg_streaming_yaml', + 'test_iceberg_to_iceberg_batch_yaml' +]) def _iceberg_cdc_read_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): """ @@ -818,9 +820,7 @@ def _iceberg_cdc_read_test_preprocessor( for k, v in config.items() if k.startswith('__') } transform['config']['elements'] = INPUT_TABLES[( - config['catalog_name'], - db_name, - table_name)] + config['catalog_name'], db_name, table_name)] return test_spec @@ -1351,7 +1351,8 @@ def _jinja_preprocessor(raw_spec_string: str, test_name: str): ('orders-test', 'order-database', 'orders'): input_data. spanner_orders_data(), ('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(), - ('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data.shipment_data_filtered(), + ('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data. + shipment_data_filtered(), ('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data. bigtable_data(), ('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(), From e32ed12787f576e23c55487e5a38cc88fe0589a9 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Wed, 29 Oct 2025 12:27:05 -0700 Subject: [PATCH 3/8] Add Filters to integration test --- .../apache_beam/yaml/extended_tests/databases/iceberg.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index 5b3800af83b1..163d86d155a4 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -62,7 +62,6 @@ pipelines: project: "apache-beam-testing" temp_location: "{TEMP_DIR}" - - name: read_cdc pipeline: type: chain @@ -74,6 +73,7 @@ pipelines: catalog_properties: type: hadoop warehouse: "{TEMP_DIR}" + filter: "label = '11a'" keep: - label - rank @@ -81,8 +81,6 @@ pipelines: config: elements: - {label: "11a", rank: 0} - - {label: "37a", rank: 1} - - {label: "389a", rank: 2} options: project: "apache-beam-testing" temp_location: "{TEMP_DIR}" From 3b44c3a3571fd41a7047e5cdb6ec26d4e6035de0 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Thu, 30 Oct 2025 09:52:49 -0700 Subject: [PATCH 4/8] Fix Mock Tests --- .../transforms/blueprint/iceberg_to_iceberg_batch.yaml | 4 ++-- .../transforms/blueprint/iceberg_to_iceberg_streaming.yaml | 4 ++-- .../apache_beam/yaml/extended_tests/databases/iceberg.yaml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml index e8520ab88ab8..0d29612858bd 100644 --- a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml @@ -58,5 +58,5 @@ pipeline: rest-metrics-reporting-enabled: "false" # Expected: -# - Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') -# - Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') +# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') +# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml index 19ad7b6a4d37..5386dd780cb6 100644 --- a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml +++ b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml @@ -61,5 +61,5 @@ pipeline: triggering_frequency_seconds: 30 # Expected: -# - Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') -# - Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') +# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') +# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index 163d86d155a4..01687066a929 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -73,7 +73,7 @@ pipelines: catalog_properties: type: hadoop warehouse: "{TEMP_DIR}" - filter: "label = '11a'" + filter: '"label" = ''11a''' keep: - label - rank From 8768fe8a72ea8cfdf4b81a9c0e354c6d6dac3d92 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 10 Nov 2025 08:44:51 -0800 Subject: [PATCH 5/8] Remove Iceberg Blueprints from Beam Repo --- .../blueprint/iceberg_to_iceberg_batch.yaml | 62 ------------------ .../iceberg_to_iceberg_streaming.yaml | 65 ------------------- 2 files changed, 127 deletions(-) delete mode 100644 sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml delete mode 100644 sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml deleted file mode 100644 index 0d29612858bd..000000000000 --- a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_batch.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# coding=utf-8 -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# A pipeline that reads records from a Medallion Bronze Iceberg dataset -# in a batch manner and writes to a Silver Iceberg dataset. - -pipeline: - type: chain - transforms: - # Step 1: Read Records from Iceberg Table within a time range - - type: ReadFromIcebergCDC - name: ReadFromBronzeDataset - config: - table: "shipment_dataset_bronze.shipments" - catalog_name: "shipment_data" - catalog_properties: - type: "rest" - uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" - warehouse: "gs://biglake_shipment_data" - header.x-goog-user-project: "apache-beam-testing" - rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" - rest-metrics-reporting-enabled: "false" - filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' - keep: - - "customer_id" - - "shipment_date" - - "shipment_cost" - - "customer_name" - - "customer_email" - - # Step 2: Write records out to Iceberg - - type: WriteToIceberg - name: WriteToSilverDataset - config: - table: "shipment_dataset_silver.shipments" - catalog_name: "shipment_data" - catalog_properties: - type: "rest" - uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" - warehouse: "gs://biglake_shipment_data" - header.x-goog-user-project: "apache-beam-testing" - rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" - rest-metrics-reporting-enabled: "false" - -# Expected: -# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') -# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') diff --git a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml b/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml deleted file mode 100644 index 5386dd780cb6..000000000000 --- a/sdks/python/apache_beam/yaml/examples/transforms/blueprint/iceberg_to_iceberg_streaming.yaml +++ /dev/null @@ -1,65 +0,0 @@ -# coding=utf-8 -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# A pipeline that reads Append CDC events from Medallion Bronze Iceberg dataset -# and writes to Silver Iceberg dataset. - -pipeline: - type: chain - transforms: - # Step 1: Read CDC Append Records from Iceberg Table - - type: ReadFromIcebergCDC - name: ReadFromBronzeDataset - config: - table: "shipment_dataset_bronze.shipments" - catalog_name: "shipment_data" - catalog_properties: - type: "rest" - uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" - warehouse: "gs://biglake_shipment_data" - header.x-goog-user-project: "apache-beam-testing" - rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" - rest-metrics-reporting-enabled: "false" - poll_interval_seconds: 30 - streaming: true - filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0' - keep: - - "customer_id" - - "shipment_date" - - "shipment_cost" - - "customer_name" - - "customer_email" - - # Step 2: Write records out to Iceberg - - type: WriteToIceberg - name: WriteToSilverDataset - config: - table: "shipment_dataset_silver.shipments" - catalog_name: "shipment_data" - catalog_properties: - type: "rest" - uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog" - warehouse: "gs://biglake_shipment_data" - header.x-goog-user-project: "apache-beam-testing" - rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager" - rest-metrics-reporting-enabled: "false" - triggering_frequency_seconds: 30 - -# Expected: -# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='alice@example.com') -# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='alice@example.com') From 7efe0878d827b9c21bbe0d3042e5803c1b724191 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 10 Nov 2025 08:53:34 -0800 Subject: [PATCH 6/8] Remove mock tests --- .../yaml/examples/testing/examples_test.py | 37 +------------------ .../yaml/examples/testing/input_data.py | 17 --------- 2 files changed, 1 insertion(+), 53 deletions(-) diff --git a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py index 268903d8fcd2..4f0516a1ea93 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/examples_test.py +++ b/sdks/python/apache_beam/yaml/examples/testing/examples_test.py @@ -679,9 +679,7 @@ def _kafka_test_preprocessor( 'test_anomaly_scoring_yaml', 'test_wordCountInclude_yaml', 'test_wordCountImport_yaml', - 'test_iceberg_to_alloydb_yaml', - 'test_iceberg_to_iceberg_streaming_yaml', - 'test_iceberg_to_iceberg_batch_yaml' + 'test_iceberg_to_alloydb_yaml' ]) def _io_write_test_preprocessor( test_spec: dict, expected: List[str], env: TestEnvironment): @@ -794,37 +792,6 @@ def _iceberg_io_read_test_preprocessor( return test_spec -@YamlExamplesTestSuite.register_test_preprocessor([ - 'test_iceberg_to_iceberg_streaming_yaml', - 'test_iceberg_to_iceberg_batch_yaml' -]) -def _iceberg_cdc_read_test_preprocessor( - test_spec: dict, expected: List[str], env: TestEnvironment): - """ - Preprocessor for tests that involve reading CDC events from Iceberg. - - This preprocessor replaces any ReadFromIcebergCDC transform with a Create - transform that reads from a predefined in-memory dictionary. This allows - the test to verify the pipeline's correctness without relying on Iceberg - tables stored externally. - """ - if pipeline := test_spec.get('pipeline', None): - for transform in pipeline.get('transforms', []): - if transform.get('type', '') == 'ReadFromIcebergCDC': - config = transform['config'] - db_name, table_name = config['table'].split('.') - - transform['type'] = 'Create' - transform['config'] = { - k: v - for k, v in config.items() if k.startswith('__') - } - transform['config']['elements'] = INPUT_TABLES[( - config['catalog_name'], db_name, table_name)] - - return test_spec - - @YamlExamplesTestSuite.register_test_preprocessor([ 'test_spanner_read_yaml', 'test_enrich_spanner_with_bigquery_yaml', @@ -1351,8 +1318,6 @@ def _jinja_preprocessor(raw_spec_string: str, test_name: str): ('orders-test', 'order-database', 'orders'): input_data. spanner_orders_data(), ('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(), - ('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data. - shipment_data_filtered(), ('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data. bigtable_data(), ('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(), diff --git a/sdks/python/apache_beam/yaml/examples/testing/input_data.py b/sdks/python/apache_beam/yaml/examples/testing/input_data.py index 115fcd5c0748..fb468567355d 100644 --- a/sdks/python/apache_beam/yaml/examples/testing/input_data.py +++ b/sdks/python/apache_beam/yaml/examples/testing/input_data.py @@ -196,23 +196,6 @@ def shipments_data(): }] -def shipment_data_filtered(): - return [{ - 'customer_id': 'C1', - 'shipment_date': '2023-05-01', - 'shipment_cost': 150.0, - 'customer_name': 'Alice', - 'customer_email': 'alice@example.com' - }, - { - 'customer_id': 'C1', - 'shipment_date': '2023-05-10', - 'shipment_cost': 20.0, - 'customer_name': 'Alice', - 'customer_email': 'alice@example.com' - }] - - def bigtable_data(): return [{ 'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2' From 4ee32d2550508a3ccefec5cbefd3f27807ac5b51 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 11 Nov 2025 11:24:07 -0800 Subject: [PATCH 7/8] Adding timestamps --- .../apache_beam/yaml/extended_tests/databases/iceberg.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index 01687066a929..3dfa54851221 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -73,7 +73,9 @@ pipelines: catalog_properties: type: hadoop warehouse: "{TEMP_DIR}" - filter: '"label" = ''11a''' + from_timestamp: "2025-11-11 00:00:00 UTC" + to_timestamp: "2035-11-11 00:00:00 UTC" + filter: '"label" = ''11a'' or "rank" = 1' keep: - label - rank @@ -81,6 +83,7 @@ pipelines: config: elements: - {label: "11a", rank: 0} + - {label: "37a", rank: 1} options: project: "apache-beam-testing" temp_location: "{TEMP_DIR}" From 216afae15d2f7f256089953ddad41ddc1f54bad7 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Tue, 11 Nov 2025 12:14:55 -0800 Subject: [PATCH 8/8] Add Streaming test --- .../extended_tests/databases/iceberg.yaml | 32 +++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml index 3dfa54851221..d7449233aab5 100644 --- a/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml +++ b/sdks/python/apache_beam/yaml/extended_tests/databases/iceberg.yaml @@ -62,7 +62,7 @@ pipelines: project: "apache-beam-testing" temp_location: "{TEMP_DIR}" - - name: read_cdc + - name: read_cdc_batch pipeline: type: chain transforms: @@ -73,8 +73,34 @@ pipelines: catalog_properties: type: hadoop warehouse: "{TEMP_DIR}" - from_timestamp: "2025-11-11 00:00:00 UTC" - to_timestamp: "2035-11-11 00:00:00 UTC" + from_timestamp: 1762819200000 + to_timestamp: 2078352000000 + filter: '"label" = ''11a'' or "rank" = 1' + keep: + - label + - rank + - type: AssertEqual + config: + elements: + - {label: "11a", rank: 0} + - {label: "37a", rank: 1} + options: + project: "apache-beam-testing" + temp_location: "{TEMP_DIR}" + + - name: read_cdc_streaming + pipeline: + type: chain + transforms: + - type: ReadFromIcebergCDC + config: + table: db.labels + catalog_name: hadoop_catalog + catalog_properties: + type: hadoop + warehouse: "{TEMP_DIR}" + streaming: True + to_timestamp: 2078352000000 filter: '"label" = ''11a'' or "rank" = 1' keep: - label