Skip to content

Conversation

@dnamaz
Copy link

@dnamaz dnamaz commented Dec 6, 2025

Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators to Flink Runner

Title

[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedRead

Description

What is this PR doing?

This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. These translators handle the case where Read.Unbounded and Read.Bounded are converted to primitive reads by SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary().

Why is this needed?

The Flink classic runner (non-portable) calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using the beam_fn_api experiment. This converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead transforms. However, there were no registered translators for these transforms, causing pipelines using unbounded sources (like KinesisIO.read()) to fail with:

java.lang.IllegalStateException: No translator known for org.apache.beam.sdk.util.construction.SplittableParDo$PrimitiveUnboundedRead

How does it work?

The new translators:

  1. PrimitiveUnboundedReadTranslator: Extracts the UnboundedSource from PrimitiveUnboundedRead.getSource() and creates a FlinkUnboundedSource, following the same pattern as the existing UnboundedReadSourceTranslator.

  2. PrimitiveBoundedReadTranslator: Extracts the BoundedSource from PrimitiveBoundedRead.getSource() and creates a FlinkBoundedSource, following the same pattern as the existing BoundedReadSourceTranslator.

The key difference from the existing translators is that they retrieve the source directly from the transform (transform.getSource()) rather than using ReadTranslation.unboundedSourceFromTransform(), since PrimitiveUnboundedRead and PrimitiveBoundedRead are not standard Read transforms with URNs.

Changes

  • FlinkStreamingTransformTranslators.java:

    • Added PrimitiveUnboundedReadTranslator<T> class
    • Added PrimitiveBoundedReadTranslator<T> class
    • Modified getTranslator() to check for PrimitiveUnboundedRead and PrimitiveBoundedRead instances before URN lookup
  • FlinkStreamingTransformTranslatorsTest.java:

    • Added getTranslatorReturnsPrimitiveUnboundedReadTranslator() test
    • Added getTranslatorReturnsPrimitiveBoundedReadTranslator() test
    • Added primitiveUnboundedReadTranslatorProducesCorrectSource() test
    • Added primitiveBoundedReadTranslatorProducesCorrectSource() test
  • CHANGES.md:

    • Added bugfix entry for 2.71.0

Issue

Fixes #XXXXX

Created issue: #37035
Related to #20530 (Use SDF read as default)

Checklist

  • Code formatted with ./gradlew :runners:flink:1.18:spotlessApply
  • Unit tests added in FlinkStreamingTransformTranslatorsTest.java
  • All Flink runner tests pass (./gradlew :runners:flink:1.18:test)
  • CHANGES.md updated and formatted with ./gradlew formatChanges
  • No breaking changes to public API

Testing

Unit Tests

./gradlew :runners:flink:1.18:test
# BUILD SUCCESSFUL - all tests pass

Integration Testing

Tested on AWS Managed Apache Flink with a real pipeline using KinesisIO.read():

  1. Before fix: Pipeline fails during translation with "No translator known for PrimitiveUnboundedRead"
  2. After fix: Pipeline successfully translates and runs, reading records from Kinesis

Test environment:

  • AWS Managed Apache Flink (FLINK-1_18 runtime)
  • Apache Beam 2.71.0-SNAPSHOT (with this fix)
  • KinesisIO.read() connector

Backwards Compatibility

This change is fully backwards compatible:

  1. No public API changes: Only internal translator classes are added
  2. No behavior changes for existing code: The new translators only activate when PrimitiveUnboundedRead or PrimitiveBoundedRead transforms are present (which previously caused failures)
  3. Existing URN-based translation unchanged: Standard Read.Bounded and Read.Unbounded with URNs continue to use the existing ReadSourceTranslator

Performance

No performance impact expected. The new translators use the same FlinkUnboundedSource and FlinkBoundedSource implementations as the existing translators.

…ink streaming runner

This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead
and SplittableParDo.PrimitiveBoundedRead to the Flink streaming runner,
enabling unbounded sources like KinesisIO.read() to work with the classic
Flink runner on AWS Managed Flink.

Problem:
- Read.Unbounded.expand() wraps sources with UnboundedSourceAsSDFWrapperFn
- FlinkRunner converts SDF-based reads to PrimitiveUnboundedRead
- FlinkStreamingTransformTranslators.getTranslator() had no translator for
  PrimitiveUnboundedRead, causing 'No translator known' errors

Solution:
- Add PrimitiveUnboundedReadTranslator that extracts UnboundedSource directly
  from transform.getSource() instead of using ReadTranslation
- Add PrimitiveBoundedReadTranslator for bounded sources
- Modify getTranslator() to check for these transform types explicitly

Testing:
- Added 4 new unit tests
- All 383 Flink runner tests pass
…veBoundedRead

This commit adds explicit translators for SplittableParDo.PrimitiveUnboundedRead
and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform
translators.

The Flink classic runner calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()
when NOT using beam_fn_api experiment, which converts SDF-wrapped reads to
PrimitiveUnboundedRead and PrimitiveBoundedRead. Without translators for these
transforms, pipelines using unbounded sources like KinesisIO.read() fail with:
'No translator known for PrimitiveUnboundedRead'

Changes:
- Add PrimitiveUnboundedReadTranslator class
- Add PrimitiveBoundedReadTranslator class
- Modify getTranslator() to handle these transforms before URN lookup
- Add unit tests for the new translators
- Update CHANGES.md with bugfix entry

Related to apache#20530
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @dnamaz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical issue in the Flink classic runner where pipelines using certain read operations, specifically those internally converted to "PrimitiveUnboundedRead" or "PrimitiveBoundedRead" by "SplittableParDo", would fail during translation due to the absence of appropriate translators. By implementing dedicated translators for these primitive read types, the change ensures that such pipelines, including those leveraging unbounded sources like KinesisIO, can now execute successfully, thereby enhancing the robustness and compatibility of the Flink runner.

Highlights

  • Flink Runner Bug Fix: Addresses an "IllegalStateException" in the Flink classic runner when using unbounded sources (e.g., KinesisIO.read()) without the "beam_fn_api" experiment, caused by missing translators for "PrimitiveUnboundedRead" and "PrimitiveBoundedRead" transforms.
  • New Translators Added: Introduces "PrimitiveUnboundedReadTranslator" and "PrimitiveBoundedReadTranslator" to handle the conversion of "Read.Unbounded" and "Read.Bounded" into their primitive forms by "SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()".
  • Translation Mechanism: The new translators directly extract the "UnboundedSource" or "BoundedSource" from the respective primitive read transform, bypassing the URN-based translation used for standard "Read" transforms.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

github-actions bot commented Dec 7, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant