Skip to content

Commit f3eed3d

Browse files
committed
[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead
This commit adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. The Flink classic runner calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using beam_fn_api experiment, which converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead. Without translators for these transforms, pipelines using unbounded sources like KinesisIO.read() fail with: 'No translator known for PrimitiveUnboundedRead' Changes: - Add PrimitiveUnboundedReadTranslator class - Add PrimitiveBoundedReadTranslator class - Modify getTranslator() to handle these transforms before URN lookup - Add unit tests for the new translators - Update CHANGES.md with bugfix entry Related to #20530
1 parent 586b7c1 commit f3eed3d

File tree

2 files changed

+6
-4
lines changed

2 files changed

+6
-4
lines changed

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484

8585
## Bugfixes
8686

87+
* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#XXXXX](https://github.com/apache/beam/issues/XXXXX)).
8788
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
8889
Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)).
8990

runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,8 @@ public void primitiveUnboundedReadTranslatorProducesCorrectSource() {
196196

197197
FlinkSource<?, ?> source =
198198
(FlinkSource<?, ?>)
199-
((SourceTransformation<?, ?, ?>) Iterables.getOnlyElement(oneInputTransform.getInputs()))
199+
((SourceTransformation<?, ?, ?>)
200+
Iterables.getOnlyElement(oneInputTransform.getInputs()))
200201
.getSource();
201202

202203
assertEquals(maxParallelism, source.getNumSplits());
@@ -208,7 +209,8 @@ public void primitiveBoundedReadTranslatorProducesCorrectSource() {
208209
final int parallelism = 2;
209210

210211
SplittableParDo.PrimitiveBoundedRead<String> transform =
211-
new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(maxParallelism)));
212+
new SplittableParDo.PrimitiveBoundedRead<>(
213+
Read.from(new TestBoundedSource(maxParallelism)));
212214
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
213215
env.setParallelism(parallelism);
214216
env.setMaxParallelism(maxParallelism);
@@ -288,8 +290,7 @@ private Object applyReadSourceTransformWithTranslator(
288290
Pipeline.create());
289291

290292
ctx.setCurrentTransform(appliedTransform);
291-
((FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>>)
292-
translator)
293+
((FlinkStreamingPipelineTranslator.StreamTransformTranslator<PTransform<?, ?>>) translator)
293294
.translateNode(transform, ctx);
294295

295296
return ctx.getInputDataStream(pc).getTransformation();

0 commit comments

Comments
 (0)