Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

## Bugfixes

* Fixed Flink classic runner failing with "No translator known for PrimitiveUnboundedRead" when using unbounded source connectors like KinesisIO after SDF-to-primitive-read conversion (Java) ([#37035](https://github.com/apache/beam/issues/37035)).
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0 (Python) ([#35738](https://github.com/apache/beam/issues/35738)).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand Down Expand Up @@ -174,6 +175,14 @@ class FlinkStreamingTransformTranslators {

public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(
PTransform<?, ?> transform) {
// Handle PrimitiveUnboundedRead explicitly (created by SplittableParDo conversion)
if (transform instanceof SplittableParDo.PrimitiveUnboundedRead) {
return new PrimitiveUnboundedReadTranslator<>();
}
// Handle PrimitiveBoundedRead explicitly
if (transform instanceof SplittableParDo.PrimitiveBoundedRead) {
return new PrimitiveBoundedReadTranslator<>();
}
@Nullable String urn = PTransformTranslation.urnForTransformOrNull(transform);
return urn == null ? null : TRANSLATORS.get(urn);
}
Expand All @@ -183,33 +192,131 @@ public static String getCurrentTransformName(FlinkStreamingTranslationContext co
return context.getCurrentTransform().getFullName();
}

/** Returns the parallelism to use for source operators. */
private static int getSourceParallelism(FlinkStreamingTranslationContext context) {
int maxParallelism = context.getExecutionEnvironment().getMaxParallelism();
return maxParallelism > 0 ? maxParallelism : context.getExecutionEnvironment().getParallelism();
}

// --------------------------------------------------------------------------------------------
// Transformation Implementations
// --------------------------------------------------------------------------------------------

/** Common translation logic for unbounded sources. */
@SuppressWarnings("unchecked")
private static <T> void translateUnboundedSource(
UnboundedSource<T, ?> rawSource,
String transformName,
FlinkStreamingTranslationContext context) {

PCollection<T> output =
(PCollection<T>)
Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values());
Comment on lines +212 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic to retrieve the single output PCollection is duplicated in translateBoundedSource (line 273). To improve maintainability and reduce code duplication, consider extracting this into a private static helper method within this class.

For example:

@SuppressWarnings("unchecked")
private static <T> PCollection<T> getOnlyOutput(FlinkStreamingTranslationContext context) {
  return (PCollection<T>)
      Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values());
}

You could then call getOnlyOutput(context) in both translateUnboundedSource and translateBoundedSource.

Copy link
Author

@dnamaz dnamaz Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but felt that extracting a 2-3 line helper for code that's only duplicated in two places within the same file would be over-abstraction. The Iterables.getOnlyElement() call is already self-documenting about the expected behavior.
@gemini-code-assist

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/gemini review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You make a valid point. While the DRY principle is generally good for maintainability, it's also important to balance it with readability and avoiding unnecessary abstraction, especially for very short and self-documenting code snippets like Iterables.getOnlyElement(). Given that the duplication is limited to two places within the same file, your decision to keep it as is, prioritizing immediate readability and avoiding potential over-abstraction, is perfectly reasonable. Thank you for the thoughtful explanation!


DataStream<WindowedValue<T>> source;
DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
TypeInformation<WindowedValue<T>> outputTypeInfo = context.getTypeInfo(output);

Coder<T> coder = output.getCoder();

TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
new CoderTypeInformation<>(
WindowedValues.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
output.getWindowingStrategy().getWindowFn().windowCoder()),
context.getPipelineOptions());

String fullName = getCurrentTransformName(context);
try {
int parallelism = getSourceParallelism(context);

FlinkUnboundedSource<T> unboundedSource =
FlinkSource.unbounded(
transformName,
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.fromSource(
unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.uid(fullName);

if (rawSource.requiresDeduping()) {
source =
nonDedupSource
.keyBy(new ValueWithRecordIdKeySelector<>())
.transform(
"deduping",
outputTypeInfo,
new DedupingOperator<>(context.getPipelineOptions()))
.uid(format("%s/__deduplicated__", fullName));
} else {
source =
nonDedupSource
.flatMap(new StripIdsMap<>(context.getPipelineOptions()))
.returns(outputTypeInfo);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
}

context.setOutputDataStream(output, source);
}

/** Common translation logic for bounded sources. */
@SuppressWarnings("unchecked")
private static <T> void translateBoundedSource(
BoundedSource<T> rawSource, String transformName, FlinkStreamingTranslationContext context) {

PCollection<T> output =
(PCollection<T>)
Iterables.getOnlyElement(context.getCurrentTransform().getOutputs().values());

TypeInformation<WindowedValue<T>> outputTypeInfo = context.getTypeInfo(output);

String fullName = getCurrentTransformName(context);
int parallelism = getSourceParallelism(context);

FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
transformName,
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);

SingleOutputStreamOperator<WindowedValue<T>> source;
try {
source =
context
.getExecutionEnvironment()
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.uid(fullName)
.returns(outputTypeInfo);

if (!context.isStreaming()
&& context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getForceSlotSharingGroup()) {
source = source.slotSharingGroup(FORCED_SLOT_GROUP);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
}

context.setOutputDataStream(output, source);
}

private static class UnboundedReadSourceTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
PTransform<PBegin, PCollection<T>>> {

@Override
public void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);

DataStream<WindowedValue<T>> source;
DataStream<WindowedValue<ValueWithRecordId<T>>> nonDedupSource;
TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));

Coder<T> coder = context.getOutput(transform).getCoder();

TypeInformation<WindowedValue<ValueWithRecordId<T>>> withIdTypeInfo =
new CoderTypeInformation<>(
WindowedValues.getFullCoder(
ValueWithRecordId.ValueWithRecordIdCoder.of(coder),
output.getWindowingStrategy().getWindowFn().windowCoder()),
context.getPipelineOptions());

UnboundedSource<T, ?> rawSource;
try {
rawSource =
Expand All @@ -219,47 +326,43 @@ public void translateNode(
} catch (IOException e) {
throw new RuntimeException(e);
}
translateUnboundedSource(rawSource, transform.getName(), context);
}
}

String fullName = getCurrentTransformName(context);
try {
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkUnboundedSource<T> unboundedSource =
FlinkSource.unbounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);
nonDedupSource =
context
.getExecutionEnvironment()
.fromSource(
unboundedSource, WatermarkStrategy.noWatermarks(), fullName, withIdTypeInfo)
.uid(fullName);
/**
* Translator for {@link SplittableParDo.PrimitiveUnboundedRead}.
*
* <p>This handles the case where Read.Unbounded is converted to PrimitiveUnboundedRead by {@link
* SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
*/
private static class PrimitiveUnboundedReadTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDo.PrimitiveUnboundedRead<T>> {

if (rawSource.requiresDeduping()) {
source =
nonDedupSource
.keyBy(new ValueWithRecordIdKeySelector<>())
.transform(
"deduping",
outputTypeInfo,
new DedupingOperator<>(context.getPipelineOptions()))
.uid(format("%s/__deduplicated__", fullName));
} else {
source =
nonDedupSource
.flatMap(new StripIdsMap<>(context.getPipelineOptions()))
.returns(outputTypeInfo);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
}
@Override
public void translateNode(
SplittableParDo.PrimitiveUnboundedRead<T> transform,
FlinkStreamingTranslationContext context) {
translateUnboundedSource(transform.getSource(), transform.getName(), context);
}
}

/**
* Translator for {@link SplittableParDo.PrimitiveBoundedRead}.
*
* <p>This handles the case where Read.Bounded is converted to PrimitiveBoundedRead by {@link
* SplittableParDo#convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary}.
*/
private static class PrimitiveBoundedReadTranslator<T>
extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<
SplittableParDo.PrimitiveBoundedRead<T>> {

context.setOutputDataStream(output, source);
@Override
public void translateNode(
SplittableParDo.PrimitiveBoundedRead<T> transform,
FlinkStreamingTranslationContext context) {
translateBoundedSource(transform.getSource(), transform.getName(), context);
}
}

Expand Down Expand Up @@ -372,11 +475,6 @@ private static class BoundedReadSourceTranslator<T>
@Override
public void translateNode(
PTransform<PBegin, PCollection<T>> transform, FlinkStreamingTranslationContext context) {
PCollection<T> output = context.getOutput(transform);

TypeInformation<WindowedValue<T>> outputTypeInfo =
context.getTypeInfo(context.getOutput(transform));

BoundedSource<T> rawSource;
try {
rawSource =
Expand All @@ -386,43 +484,7 @@ public void translateNode(
} catch (IOException e) {
throw new RuntimeException(e);
}

String fullName = getCurrentTransformName(context);
int parallelism =
context.getExecutionEnvironment().getMaxParallelism() > 0
? context.getExecutionEnvironment().getMaxParallelism()
: context.getExecutionEnvironment().getParallelism();

FlinkBoundedSource<T> flinkBoundedSource =
FlinkSource.bounded(
transform.getName(),
rawSource,
new SerializablePipelineOptions(context.getPipelineOptions()),
parallelism);

TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(output);

SingleOutputStreamOperator<WindowedValue<T>> source;
try {
source =
context
.getExecutionEnvironment()
.fromSource(
flinkBoundedSource, WatermarkStrategy.noWatermarks(), fullName, outputTypeInfo)
.uid(fullName)
.returns(typeInfo);

if (!context.isStreaming()
&& context
.getPipelineOptions()
.as(FlinkPipelineOptions.class)
.getForceSlotSharingGroup()) {
source = source.slotSharingGroup(FORCED_SLOT_GROUP);
}
} catch (Exception e) {
throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
}
context.setOutputDataStream(output, source);
translateBoundedSource(rawSource, transform.getName(), context);
}
}

Expand Down
Loading
Loading