Skip to content
37 changes: 36 additions & 1 deletion sdks/python/apache_beam/yaml/examples/testing/examples_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,9 @@ def _kafka_test_preprocessor(
'test_anomaly_scoring_yaml',
'test_wordCountInclude_yaml',
'test_wordCountImport_yaml',
'test_iceberg_to_alloydb_yaml'
'test_iceberg_to_alloydb_yaml',
'test_iceberg_to_iceberg_streaming_yaml',
'test_iceberg_to_iceberg_batch_yaml'
])
def _io_write_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
Expand Down Expand Up @@ -792,6 +794,37 @@ def _iceberg_io_read_test_preprocessor(
return test_spec


@YamlExamplesTestSuite.register_test_preprocessor([
'test_iceberg_to_iceberg_streaming_yaml',
'test_iceberg_to_iceberg_batch_yaml'
])
def _iceberg_cdc_read_test_preprocessor(
test_spec: dict, expected: List[str], env: TestEnvironment):
"""
Preprocessor for tests that involve reading CDC events from Iceberg.

This preprocessor replaces any ReadFromIcebergCDC transform with a Create
transform that reads from a predefined in-memory dictionary. This allows
the test to verify the pipeline's correctness without relying on Iceberg
tables stored externally.
"""
if pipeline := test_spec.get('pipeline', None):
for transform in pipeline.get('transforms', []):
if transform.get('type', '') == 'ReadFromIcebergCDC':
config = transform['config']
db_name, table_name = config['table'].split('.')

transform['type'] = 'Create'
transform['config'] = {
k: v
for k, v in config.items() if k.startswith('__')
}
transform['config']['elements'] = INPUT_TABLES[(
config['catalog_name'], db_name, table_name)]

return test_spec


@YamlExamplesTestSuite.register_test_preprocessor([
'test_spanner_read_yaml',
'test_enrich_spanner_with_bigquery_yaml',
Expand Down Expand Up @@ -1318,6 +1351,8 @@ def _jinja_preprocessor(raw_spec_string: str, test_name: str):
('orders-test', 'order-database', 'orders'): input_data.
spanner_orders_data(),
('db', 'users', 'NY'): input_data.iceberg_dynamic_destinations_users_data(),
('shipment_data', 'shipment_dataset_bronze', 'shipments'): input_data.
shipment_data_filtered(),
('BigTable', 'beam-test', 'bigtable-enrichment-test'): input_data.
bigtable_data(),
('BigQuery', 'ALL_TEST', 'customers'): input_data.bigquery_data(),
Expand Down
17 changes: 17 additions & 0 deletions sdks/python/apache_beam/yaml/examples/testing/input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,23 @@ def shipments_data():
}]


def shipment_data_filtered():
return [{
'customer_id': 'C1',
'shipment_date': '2023-05-01',
'shipment_cost': 150.0,
'customer_name': 'Alice',
'customer_email': '[email protected]'
},
{
'customer_id': 'C1',
'shipment_date': '2023-05-10',
'shipment_cost': 20.0,
'customer_name': 'Alice',
'customer_email': '[email protected]'
}]


def bigtable_data():
return [{
'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A pipeline that reads records from a Medallion Bronze Iceberg dataset
# in a batch manner and writes to a Silver Iceberg dataset.

pipeline:
type: chain
transforms:
# Step 1: Read Records from Iceberg Table within a time range
- type: ReadFromIcebergCDC
name: ReadFromBronzeDataset
config:
table: "shipment_dataset_bronze.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0'
keep:
- "customer_id"
- "shipment_date"
- "shipment_cost"
- "customer_name"
- "customer_email"

# Step 2: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToSilverDataset
config:
table: "shipment_dataset_silver.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"

# Expected:
# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]')
# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]')
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A pipeline that reads Append CDC events from Medallion Bronze Iceberg dataset
# and writes to Silver Iceberg dataset.

pipeline:
type: chain
transforms:
# Step 1: Read CDC Append Records from Iceberg Table
- type: ReadFromIcebergCDC
name: ReadFromBronzeDataset
config:
table: "shipment_dataset_bronze.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So seems like these are pure examples currently. Is the plan to parameterize these in the future so that these an be used in Flex template and Job builder blueprints ? If so can we add such parameterization now ?

Copy link
Collaborator

@derrickaw derrickaw Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. I think we need to consider that the golden blueprints that are used in the Dataflow templates repo have to be properly tested:

Ideally, we should probably have:

  1. One example blueprint that is properly filled out for users to use that has mock testing.
  2. That same example blueprint that has jinja variable parameterization and has integration testing so that we know it works correctly.
  3. Any additional transform testing for precommit or postcommits.

What do you think about this:

  1. Add a golden jinja yaml blueprint in the extended_tests/blueprints that we can use for both integration testing and for eventually linking in the Dataflow templates repo.
  2. Create the examples blueprints based on these golden ones dynamically.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me. Thanks!

Copy link
Contributor Author

@tarun-google tarun-google Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with these in ideal scenario.But i think we cannot do exact integration tests for these YAML files, especially streaming pipelines which never end. We any way are writing Integration tests in extended_tests which are close to blueprints. May be we can directly add jinja parameterized blueprints directly to blueprints folder ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding more, consider this CDC usecase. First we need to have a iceberg table. So, we cannot just have a raw blueprint that is testable. We need to create them(which we already do in integration tests) or maintain them, still i do not think every blueprint is testable as its own

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to consider developing the main pipeline and testing separately.

  • Main pipeline should be something parameterized that we can use for templates / blueprints. This is our source of truth and contains the main code we need to to keep correct via testing etc. This should not have specific parameters (references to specific databases etc.) unless we want to make that the default for the template/blueprint.

  • A close enough pipeline that we want to test dynamically derived from above. In some cases this can be the original pipeline (for example, Kafka which can be run for a limited time). But for Pub/Sub we either need to inject mock transforms and/or cancel the pipeline using a runner specific mechanism (for example, Dataflow cancel operation [1]).

[1] https://docs.cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chatted with Danny offline and already mentioned to Tarun, but we are moving everything to Dataflow repo as the source of truth to reduce redundancy and complexity.

Copy link
Contributor Author

@tarun-google tarun-google Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per discussion i have removed Blueprints in this repo. But left the integration tests. As we define the YAML->SDK mapping in this repo. We should still continue adding these tests. Specific integration tests for blueprints will go to the Templates repo.

@chamikaramj do we have to wait for some beam release to start using the new YAML features added to this repo in the Templates repo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we can add CDC support for YAML and tests here.

warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
poll_interval_seconds: 30
streaming: true
filter: '"customer_id" IS NOT NULL AND "customer_id" = ''C1'' AND "shipment_cost" > 0'
keep:
- "customer_id"
- "shipment_date"
- "shipment_cost"
- "customer_name"
- "customer_email"

# Step 2: Write records out to Iceberg
- type: WriteToIceberg
name: WriteToSilverDataset
config:
table: "shipment_dataset_silver.shipments"
catalog_name: "shipment_data"
catalog_properties:
type: "rest"
uri: "https://biglake.googleapis.com/iceberg/v1beta/restcatalog"
warehouse: "gs://biglake_shipment_data"
header.x-goog-user-project: "apache-beam-testing"
rest.auth.type: "org.apache.iceberg.gcp.auth.GoogleAuthManager"
rest-metrics-reporting-enabled: "false"
triggering_frequency_seconds: 30

# Expected:
# Row(customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]')
# Row(customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]')
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,27 @@ pipelines:
- {label: "389a", rank: 2}
options:
project: "apache-beam-testing"
temp_location: "{TEMP_DIR}"
temp_location: "{TEMP_DIR}"

- name: read_cdc
pipeline:
type: chain
transforms:
- type: ReadFromIcebergCDC
config:
table: db.labels
catalog_name: hadoop_catalog
catalog_properties:
type: hadoop
warehouse: "{TEMP_DIR}"
filter: '"label" = ''11a'''
keep:
- label
- rank
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the test to include more than one element. For example, a stream of data read for a predefined Iceberg that span multiple snapshots.

Copy link
Contributor Author

@tarun-google tarun-google Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a filter on timestamp for this decade :)) also added multiple conditions to get more than one record. Could not do streaming or snapshot filters in the existing setup, Snapshot is catalog level metadata

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: Adding a streaming pipeline integration test too with timestamp cut off.

options:
project: "apache-beam-testing"
temp_location: "{TEMP_DIR}"
28 changes: 28 additions & 0 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -403,3 +403,31 @@
'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1'
config:
gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar'

#IcebergCDC
- type: renaming
transforms:
'ReadFromIcebergCDC': 'ReadFromIcebergCDC'
config:
mappings:
'ReadFromIcebergCDC':
table: 'table'
catalog_name: 'catalog_name'
catalog_properties: 'catalog_properties'
config_properties: 'config_properties'
drop: 'drop'
filter: 'filter'
from_snapshot: 'from_snapshot'
from_timestamp: 'from_timestamp'
keep: 'keep'
poll_interval_seconds: 'poll_interval_seconds'
starting_strategy: 'starting_strategy'
streaming: 'streaming'
to_snapshot: 'to_snapshot'
to_timestamp: 'to_timestamp'
underlying_provider:
type: beamJar
transforms:
'ReadFromIcebergCDC': 'beam:schematransform:org.apache.beam:iceberg_cdc_read:v1'
config:
gradle_target: 'sdks:java:io:expansion-service:shadowJar'
Loading