From 9a5af9b414a929812f13cde3dbd5fe426a5d2f9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Stankiewicz?= Date: Wed, 26 Nov 2025 20:35:44 +0100 Subject: [PATCH] fix retry scenario for query to table materialization (#36912) * fix retry scenario for query to table materialization * fix retry scenario for query to table materialization --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 2 ++ .../bigquery/BigQueryStorageQuerySource.java | 22 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7aef1bd1ce02..69b9c62ceea9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -979,6 +979,8 @@ public void processElement( getParseFn(), getOutputCoder(), getBigQueryServices()); + // due to retry, table may already exist, remove it to ensure correctness + querySource.removeDestinationIfExists(options.as(BigQueryOptions.class)); Table queryResultTable = querySource.getTargetTable(options.as(BigQueryOptions.class)); BigQueryStorageTableSource output = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java index a2350ef19a74..07c3273c293c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.gcp.bigquery; +import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.services.bigquery.model.JobStatistics; @@ -25,6 +26,7 @@ import com.google.cloud.bigquery.storage.v1.DataFormat; import java.io.IOException; import java.io.ObjectInputStream; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority; @@ -188,4 +190,24 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception { return null; } + + void removeDestinationIfExists(BigQueryOptions options) throws Exception { + DatasetService datasetService = bqServices.getDatasetService(options.as(BigQueryOptions.class)); + String project = queryTempProject; + if (project == null) { + project = + options.as(BigQueryOptions.class).getBigQueryProject() == null + ? options.as(BigQueryOptions.class).getProject() + : options.as(BigQueryOptions.class).getBigQueryProject(); + } + String tempTableID = + BigQueryResourceNaming.createJobIdPrefix( + options.getJobName(), stepUuid, BigQueryResourceNaming.JobType.QUERY); + TableReference tempTableReference = + createTempTableReference(project, tempTableID, Optional.ofNullable(queryTempDataset)); + Table destTable = datasetService.getTable(tempTableReference); + if (destTable != null) { + datasetService.deleteTable(tempTableReference); + } + } }