diff --git a/CHANGES.md b/CHANGES.md index dfad320a694d..5292c147368f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ ## Bugfixes +* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#37035](https://github.com/apache/beam/issues/37035)). * Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)). diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 79a90c554027..c148aa16005e 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -97,6 +97,7 @@ import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -174,6 +175,14 @@ class FlinkStreamingTransformTranslators { public static FlinkStreamingPipelineTranslator.StreamTransformTranslator getTranslator( PTransform transform) { + // Handle PrimitiveUnboundedRead explicitly (created by SplittableParDo conversion) + if (transform instanceof SplittableParDo.PrimitiveUnboundedRead) { + return new PrimitiveUnboundedReadTranslator<>(); + } + // Handle PrimitiveBoundedRead explicitly + if (transform instanceof SplittableParDo.PrimitiveBoundedRead) { + return new PrimitiveBoundedReadTranslator<>(); + } @Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform); return urn == null ? null : TRANSLATORS.get(urn); } @@ -183,10 +192,124 @@ public static String getCurrentTransformName(FlinkStreamingTranslationContext co return context.getCurrentTransform().getFullName(); } + /** Returns the parallelism to use for source operators. */ + private static int getSourceParallelism(FlinkStreamingTranslationContext context) { + int maxParallelism = context.getExecutionEnvironment().getMaxParallelism(); + return maxParallelism > 0 ? maxParallelism : context.getExecutionEnvironment().getParallelism(); + } + // -------------------------------------------------------------------------------------------- // Transformation Implementations // -------------------------------------------------------------------------------------------- + /** Common translation logic for unbounded sources. */ + @SuppressWarnings("unchecked") + private static void translateUnboundedSource( + UnboundedSource rawSource, + String transformName, + FlinkStreamingTranslationContext context) { + + PCollection output = + (PCollection) + Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values()); + + DataStream> source; + DataStream>> nonDedupSource; + TypeInformation> outputTypeInfo = context.getTypeInfo(output); + + Coder coder = output.getCoder(); + + TypeInformation>> withIdTypeInfo = + new CoderTypeInformation<>( + WindowedValues.getFullCoder( + ValueWithRecordId.ValueWithRecordIdCoder.of(coder), + output.getWindowingStrategy().getWindowFn().windowCoder()), + context.getPipelineOptions()); + + String fullName = getCurrentTransformName(context); + try { + int parallelism = getSourceParallelism(context); + + FlinkUnboundedSource unboundedSource = + FlinkSource.unbounded( + transformName, + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + nonDedupSource = + context + .getExecutionEnvironment() + .fromSource( + unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) + .uid(fullName); + + if (rawSource.requiresDeduping()) { + source = + nonDedupSource + .keyBy(new ValueWithRecordIdKeySelector<>()) + .transform( + "deduping", + outputTypeInfo, + new DedupingOperator<>(context.getPipelineOptions())) + .uid(format("%s/__deduplicated__", fullName)); + } else { + source = + nonDedupSource + .flatMap(new StripIdsMap<>(context.getPipelineOptions())) + .returns(outputTypeInfo); + } + } catch (Exception e) { + throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); + } + + context.setOutputDataStream(output, source); + } + + /** Common translation logic for bounded sources. */ + @SuppressWarnings("unchecked") + private static void translateBoundedSource( + BoundedSource rawSource, String transformName, FlinkStreamingTranslationContext context) { + + PCollection output = + (PCollection) + Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values()); + + TypeInformation> outputTypeInfo = context.getTypeInfo(output); + + String fullName = getCurrentTransformName(context); + int parallelism = getSourceParallelism(context); + + FlinkBoundedSource flinkBoundedSource = + FlinkSource.bounded( + transformName, + rawSource, + new SerializablePipelineOptions(context.getPipelineOptions()), + parallelism); + + SingleOutputStreamOperator> source; + try { + source = + context + .getExecutionEnvironment() + .fromSource( + flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) + .uid(fullName) + .returns(outputTypeInfo); + + if (!context.isStreaming() + && context + .getPipelineOptions() + .as(FlinkPipelineOptions.class) + .getForceSlotSharingGroup()) { + source = source.slotSharingGroup(FORCED_SLOT_GROUP); + } + } catch (Exception e) { + throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); + } + + context.setOutputDataStream(output, source); + } + private static class UnboundedReadSourceTranslator extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< PTransform>> { @@ -194,22 +317,6 @@ private static class UnboundedReadSourceTranslator @Override public void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - PCollection output = context.getOutput(transform); - - DataStream> source; - DataStream>> nonDedupSource; - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - - Coder coder = context.getOutput(transform).getCoder(); - - TypeInformation>> withIdTypeInfo = - new CoderTypeInformation<>( - WindowedValues.getFullCoder( - ValueWithRecordId.ValueWithRecordIdCoder.of(coder), - output.getWindowingStrategy().getWindowFn().windowCoder()), - context.getPipelineOptions()); - UnboundedSource rawSource; try { rawSource = @@ -219,47 +326,43 @@ public void translateNode( } catch (IOException e) { throw new RuntimeException(e); } + translateUnboundedSource(rawSource, transform.getName(), context); + } + } - String fullName = getCurrentTransformName(context); - try { - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkUnboundedSource unboundedSource = - FlinkSource.unbounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - nonDedupSource = - context - .getExecutionEnvironment() - .fromSource( - unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo) - .uid(fullName); + /** + * Translator for {@link SplittableParDo.PrimitiveUnboundedRead}. + * + *

This handles the case where Read.Unbounded is converted to PrimitiveUnboundedRead by {@link + * SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}. + */ + private static class PrimitiveUnboundedReadTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.PrimitiveUnboundedRead> { - if (rawSource.requiresDeduping()) { - source = - nonDedupSource - .keyBy(new ValueWithRecordIdKeySelector<>()) - .transform( - "deduping", - outputTypeInfo, - new DedupingOperator<>(context.getPipelineOptions())) - .uid(format("%s/__deduplicated__", fullName)); - } else { - source = - nonDedupSource - .flatMap(new StripIdsMap<>(context.getPipelineOptions())) - .returns(outputTypeInfo); - } - } catch (Exception e) { - throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e); - } + @Override + public void translateNode( + SplittableParDo.PrimitiveUnboundedRead transform, + FlinkStreamingTranslationContext context) { + translateUnboundedSource(transform.getSource(), transform.getName(), context); + } + } + + /** + * Translator for {@link SplittableParDo.PrimitiveBoundedRead}. + * + *

This handles the case where Read.Bounded is converted to PrimitiveBoundedRead by {@link + * SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}. + */ + private static class PrimitiveBoundedReadTranslator + extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< + SplittableParDo.PrimitiveBoundedRead> { - context.setOutputDataStream(output, source); + @Override + public void translateNode( + SplittableParDo.PrimitiveBoundedRead transform, + FlinkStreamingTranslationContext context) { + translateBoundedSource(transform.getSource(), transform.getName(), context); } } @@ -372,11 +475,6 @@ private static class BoundedReadSourceTranslator @Override public void translateNode( PTransform> transform, FlinkStreamingTranslationContext context) { - PCollection output = context.getOutput(transform); - - TypeInformation> outputTypeInfo = - context.getTypeInfo(context.getOutput(transform)); - BoundedSource rawSource; try { rawSource = @@ -386,43 +484,7 @@ public void translateNode( } catch (IOException e) { throw new RuntimeException(e); } - - String fullName = getCurrentTransformName(context); - int parallelism = - context.getExecutionEnvironment().getMaxParallelism() > 0 - ? context.getExecutionEnvironment().getMaxParallelism() - : context.getExecutionEnvironment().getParallelism(); - - FlinkBoundedSource flinkBoundedSource = - FlinkSource.bounded( - transform.getName(), - rawSource, - new SerializablePipelineOptions(context.getPipelineOptions()), - parallelism); - - TypeInformation> typeInfo = context.getTypeInfo(output); - - SingleOutputStreamOperator> source; - try { - source = - context - .getExecutionEnvironment() - .fromSource( - flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo) - .uid(fullName) - .returns(typeInfo); - - if (!context.isStreaming() - && context - .getPipelineOptions() - .as(FlinkPipelineOptions.class) - .getForceSlotSharingGroup()) { - source = source.slotSharingGroup(FORCED_SLOT_GROUP); - } - } catch (Exception e) { - throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e); - } - context.setOutputDataStream(output, source); + translateBoundedSource(rawSource, transform.getName(), context); } } diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java index 733bf536634c..0d2b547d82ea 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslatorsTest.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -147,6 +149,88 @@ public void readSourceTranslatorUnboundedWithoutMaxParallelism() { assertEquals(parallelism, source.getNumSplits()); } + @Test + public void getTranslatorReturnsPrimitiveUnboundedReadTranslator() { + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); + + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + assertNotNull("Translator should not be null for PrimitiveUnboundedRead", translator); + } + + @Test + public void getTranslatorReturnsPrimitiveBoundedReadTranslator() { + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>(Read.from(new TestBoundedSource(100))); + + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + + assertNotNull("Translator should not be null for PrimitiveBoundedRead", translator); + } + + @Test + public void primitiveUnboundedReadTranslatorProducesCorrectSource() { + final int maxParallelism = 4; + final int parallelism = 2; + + SplittableParDo.PrimitiveUnboundedRead transform = + new SplittableParDo.PrimitiveUnboundedRead<>(Read.from(new TestUnboundedSource())); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.setMaxParallelism(maxParallelism); + + // Use getTranslator directly to verify our new translator is used + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + assertNotNull(translator); + + Object sourceTransform = + applyReadSourceTransformWithTranslator( + transform, translator, PCollection.IsBounded.UNBOUNDED, env); + + assertTrue(sourceTransform instanceof OneInputTransformation); + OneInputTransformation oneInputTransform = (OneInputTransformation) sourceTransform; + + FlinkSource source = + (FlinkSource) + ((SourceTransformation) + Iterables.getOnlyElement(oneInputTransform.getInputs())) + .getSource(); + + assertEquals(maxParallelism, source.getNumSplits()); + } + + @Test + public void primitiveBoundedReadTranslatorProducesCorrectSource() { + final int maxParallelism = 4; + final int parallelism = 2; + + SplittableParDo.PrimitiveBoundedRead transform = + new SplittableParDo.PrimitiveBoundedRead<>( + Read.from(new TestBoundedSource(maxParallelism))); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(parallelism); + env.setMaxParallelism(maxParallelism); + + // Use getTranslator directly to verify our new translator is used + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator = + FlinkStreamingTransformTranslators.getTranslator(transform); + assertNotNull(translator); + + Object sourceTransform = + applyReadSourceTransformWithTranslator( + transform, translator, PCollection.IsBounded.BOUNDED, env); + + assertTrue(sourceTransform instanceof SourceTransformation); + FlinkBoundedSource source = + (FlinkBoundedSource) ((SourceTransformation) sourceTransform).getSource(); + + assertEquals(maxParallelism, source.getNumSplits()); + } + private Object applyReadSourceTransform( PTransform transform, PCollection.IsBounded isBounded, StreamExecutionEnvironment env) { @@ -178,6 +262,40 @@ private Object applyReadSourceTransform( return ctx.getInputDataStream(pc).getTransformation(); } + @SuppressWarnings("unchecked") + private Object applyReadSourceTransformWithTranslator( + PTransform transform, + FlinkStreamingPipelineTranslator.StreamTransformTranslator translator, + PCollection.IsBounded isBounded, + StreamExecutionEnvironment env) { + + FlinkStreamingTranslationContext ctx = + new FlinkStreamingTranslationContext(env, PipelineOptionsFactory.create(), true); + + Pipeline pipeline = Pipeline.create(); + PCollection pc = + PCollection.createPrimitiveOutputInternal( + pipeline, WindowingStrategy.globalDefault(), isBounded, StringUtf8Coder.of()); + pc.setName("output"); + + Map, PValue> outputs = new HashMap<>(); + outputs.put(new TupleTag<>(), pc); + AppliedPTransform appliedTransform = + AppliedPTransform.of( + "test-transform", + Collections.emptyMap(), + PValues.fullyExpand(outputs), + transform, + ResourceHints.create(), + Pipeline.create()); + + ctx.setCurrentTransform(appliedTransform); + ((FlinkStreamingPipelineTranslator.StreamTransformTranslator>) translator) + .translateNode(transform, ctx); + + return ctx.getInputDataStream(pc).getTransformation(); + } + @SuppressWarnings("unchecked") private FlinkStreamingPipelineTranslator.StreamTransformTranslator> getReadSourceTranslator() {