Skip to content

Commit 2ffd443

Browse files
authored
Fix inconsistent handling of Firestore Project and Database ID (#36895)
* Fix inconsistent handling of Firestore Project and Database ID in routing header (resolves #36894) * Update CHANGED.md * Address reviewer comments from yixiaoshen * Fix unit tests * revert unnecessary change to read test * Revert unnecessary change to test helper
1 parent bdd0425 commit 2ffd443

File tree

8 files changed

+31
-18
lines changed

8 files changed

+31
-18
lines changed

CHANGES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484

8585
## Bugfixes
8686

87-
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
87+
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
8888

8989
## Known Issues
9090

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions {
4848
*/
4949
void setEmulatorHost(String host);
5050

51-
/**
52-
* The Firestore database ID to connect to. Note: named database is currently an internal feature
53-
* in Firestore. Do not set this to anything other than "(default)".
54-
*/
51+
/** The Firestore database ID to connect to. */
5552
@Description("Firestore database ID")
5653
@Default.String("(default)")
5754
String getFirestoreDb();

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.Serializable;
3030
import java.security.SecureRandom;
3131
import java.util.Map;
32+
import javax.annotation.Nullable;
3233
import javax.annotation.concurrent.Immutable;
3334
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
3435
import org.apache.beam.sdk.options.PipelineOptions;
@@ -65,6 +66,13 @@ private FirestoreStatefulComponentFactory() {}
6566
* @return a new {@link FirestoreStub} pre-configured with values from the provided options
6667
*/
6768
FirestoreStub getFirestoreStub(PipelineOptions options) {
69+
return getFirestoreStub(options, null, null);
70+
}
71+
72+
FirestoreStub getFirestoreStub(
73+
PipelineOptions options,
74+
@Nullable String configuredProjectId,
75+
@Nullable String configuredDatabaseId) {
6876
try {
6977
FirestoreSettings.Builder builder = FirestoreSettings.newBuilder();
7078

@@ -94,12 +102,17 @@ FirestoreStub getFirestoreStub(PipelineOptions options) {
94102
builder
95103
.setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential()))
96104
.setEndpoint(firestoreOptions.getFirestoreHost());
105+
String projectId =
106+
configuredProjectId != null
107+
? configuredProjectId
108+
: firestoreOptions.getFirestoreProject();
109+
if (projectId == null) {
110+
projectId = gcpOptions.getProject();
111+
}
112+
String databaseId =
113+
configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb();
97114
headers.put(
98-
"x-goog-request-params",
99-
"project_id="
100-
+ gcpOptions.getProject()
101-
+ "&database_id="
102-
+ firestoreOptions.getFirestoreDb());
115+
"x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId);
103116
}
104117

105118
builder.setHeaderProvider(

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) {
244244
requireNonNull(
245245
databaseId,
246246
"firestoreDb must be defined on FirestoreOptions of PipelineOptions"));
247-
firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions());
247+
firestoreStub =
248+
firestoreStatefulComponentFactory.getFirestoreStub(
249+
c.getPipelineOptions(), project, databaseId);
248250
}
249251

250252
/**

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public final void setUp() {
111111
when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2);
112112

113113
when(ff.getRpcQos(any())).thenReturn(rpcQos);
114-
when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub);
114+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
115115
when(stub.batchWriteCallable()).thenReturn(callable);
116116
metricsFixture = new MetricsFixture();
117117
}
@@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
129129
Write write = newWrite();
130130
Element<Write> element1 = new WriteElement(0, write, window);
131131

132-
when(ff.getFirestoreStub(any())).thenReturn(stub);
132+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
133133
when(ff.getRpcQos(any())).thenReturn(rpcQos);
134134
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
135135
.thenReturn(attempt);
@@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception {
175175
@Override
176176
@Test
177177
public final void noRequestIsSentIfNotSafeToProceed() throws Exception {
178-
when(ff.getFirestoreStub(any())).thenReturn(stub);
178+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
179179
when(ff.getRpcQos(any())).thenReturn(rpcQos);
180180
when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite))
181181
.thenReturn(attempt);
@@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin
369369
LOG.debug("options = {}", options);
370370

371371
FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class);
372-
when(ff.getFirestoreStub(any())).thenReturn(stub);
372+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
373373
Random random = new Random(12345);
374374
TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1));
375375
Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis)));

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
6767
int maxBytes = 50;
6868
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
6969

70-
when(ff.getFirestoreStub(any())).thenReturn(stub);
70+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
7171
when(ff.getRpcQos(any()))
7272
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
7373

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception {
7676
int maxBytes = 50;
7777
RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build();
7878

79-
when(ff.getFirestoreStub(any())).thenReturn(stub);
79+
when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub);
8080
when(ff.getRpcQos(any()))
8181
.thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options));
8282

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.stream.Stream;
4343
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
4444
import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO;
45+
import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions;
4546
import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions;
4647
import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode;
4748
import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout;
@@ -97,7 +98,7 @@ abstract class BaseFirestoreIT {
9798
@Before
9899
public void setup() {
99100
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
100-
databaseId = "firestoredb";
101+
databaseId = TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb();
101102
}
102103

103104
private static Instant toWriteTime(WriteResult result) {

0 commit comments

Comments
 (0)