-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add primitive read translators #37036
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ink streaming runner This PR adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming runner, enabling unbounded sources like KinesisIO.read() to work with the classic Flink runner on AWS Managed Flink. Problem: - Read.Unbounded.expand() wraps sources with UnboundedSourceAsSDFWrapperFn - FlinkRunner converts SDF-based reads to PrimitiveUnboundedRead - FlinkStreamingTransformTranslators.getTranslator() had no translator for PrimitiveUnboundedRead, causing 'No translator known' errors Solution: - Add PrimitiveUnboundedReadTranslator that extracts UnboundedSource directly from transform.getSource() instead of using ReadTranslation - Add PrimitiveBoundedReadTranslator for bounded sources - Modify getTranslator() to check for these transform types explicitly Testing: - Added 4 new unit tests - All 383 Flink runner tests pass
…veBoundedRead This commit adds explicit translators for SplittableParDo.PrimitiveUnboundedRead and SplittableParDo.PrimitiveBoundedRead to the Flink streaming transform translators. The Flink classic runner calls convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary() when NOT using beam_fn_api experiment, which converts SDF-wrapped reads to PrimitiveUnboundedRead and PrimitiveBoundedRead. Without translators for these transforms, pipelines using unbounded sources like KinesisIO.read() fail with: 'No translator known for PrimitiveUnboundedRead' Changes: - Add PrimitiveUnboundedReadTranslator class - Add PrimitiveBoundedReadTranslator class - Modify getTranslator() to handle these transforms before URN lookup - Add unit tests for the new translators - Update CHANGES.md with bugfix entry Related to apache#20530
Summary of ChangesHello @dnamaz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request resolves a critical issue in the Flink classic runner where pipelines using certain read operations, specifically those internally converted to "PrimitiveUnboundedRead" or "PrimitiveBoundedRead" by "SplittableParDo", would fail during translation due to the absence of appropriate translators. By implementing dedicated translators for these primitive read types, the change ensures that such pipelines, including those leveraging unbounded sources like KinesisIO, can now execute successfully, thereby enhancing the robustness and compatibility of the Flink runner. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Pull Request: Add PrimitiveUnboundedRead/PrimitiveBoundedRead Translators to Flink Runner
Title
[Flink Runner] Add translators for PrimitiveUnboundedRead and PrimitiveBoundedReadDescription
What is this PR doing?
This PR adds explicit translators for
SplittableParDo.PrimitiveUnboundedReadandSplittableParDo.PrimitiveBoundedReadto the Flink streaming transform translators. These translators handle the case whereRead.UnboundedandRead.Boundedare converted to primitive reads bySplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary().Why is this needed?
The Flink classic runner (non-portable) calls
convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary()when NOT using thebeam_fn_apiexperiment. This converts SDF-wrapped reads toPrimitiveUnboundedReadandPrimitiveBoundedReadtransforms. However, there were no registered translators for these transforms, causing pipelines using unbounded sources (likeKinesisIO.read()) to fail with:How does it work?
The new translators:
PrimitiveUnboundedReadTranslator: Extracts theUnboundedSourcefromPrimitiveUnboundedRead.getSource()and creates aFlinkUnboundedSource, following the same pattern as the existingUnboundedReadSourceTranslator.PrimitiveBoundedReadTranslator: Extracts theBoundedSourcefromPrimitiveBoundedRead.getSource()and creates aFlinkBoundedSource, following the same pattern as the existingBoundedReadSourceTranslator.The key difference from the existing translators is that they retrieve the source directly from the transform (
transform.getSource()) rather than usingReadTranslation.unboundedSourceFromTransform(), sincePrimitiveUnboundedReadandPrimitiveBoundedReadare not standard Read transforms with URNs.Changes
FlinkStreamingTransformTranslators.java:PrimitiveUnboundedReadTranslator<T>classPrimitiveBoundedReadTranslator<T>classgetTranslator()to check forPrimitiveUnboundedReadandPrimitiveBoundedReadinstances before URN lookupFlinkStreamingTransformTranslatorsTest.java:getTranslatorReturnsPrimitiveUnboundedReadTranslator()testgetTranslatorReturnsPrimitiveBoundedReadTranslator()testprimitiveUnboundedReadTranslatorProducesCorrectSource()testprimitiveBoundedReadTranslatorProducesCorrectSource()testCHANGES.md:Issue
Fixes #XXXXX
Created issue: #37035
Related to #20530 (Use SDF read as default)
Checklist
./gradlew :runners:flink:1.18:spotlessApplyFlinkStreamingTransformTranslatorsTest.java./gradlew :runners:flink:1.18:test)CHANGES.mdupdated and formatted with./gradlew formatChangesTesting
Unit Tests
./gradlew :runners:flink:1.18:test # BUILD SUCCESSFUL - all tests passIntegration Testing
Tested on AWS Managed Apache Flink with a real pipeline using
KinesisIO.read():Test environment:
Backwards Compatibility
This change is fully backwards compatible:
PrimitiveUnboundedReadorPrimitiveBoundedReadtransforms are present (which previously caused failures)Read.BoundedandRead.Unboundedwith URNs continue to use the existingReadSourceTranslatorPerformance
No performance impact expected. The new translators use the same
FlinkUnboundedSourceandFlinkBoundedSourceimplementations as the existing translators.