@@ -192,6 +192,12 @@ public static String getCurrentTransformName(FlinkStreamingTranslationContext co
192192 return context .getCurrentTransform ().getFullName ();
193193 }
194194
195+ /** Returns the parallelism to use for source operators. */
196+ private static int getSourceParallelism (FlinkStreamingTranslationContext context ) {
197+ int maxParallelism = context .getExecutionEnvironment ().getMaxParallelism ();
198+ return maxParallelism > 0 ? maxParallelism : context .getExecutionEnvironment ().getParallelism ();
199+ }
200+
195201 // --------------------------------------------------------------------------------------------
196202 // Transformation Implementations
197203 // --------------------------------------------------------------------------------------------
@@ -222,10 +228,7 @@ private static <T> void translateUnboundedSource(
222228
223229 String fullName = getCurrentTransformName (context );
224230 try {
225- int parallelism =
226- context .getExecutionEnvironment ().getMaxParallelism () > 0
227- ? context .getExecutionEnvironment ().getMaxParallelism ()
228- : context .getExecutionEnvironment ().getParallelism ();
231+ int parallelism = getSourceParallelism (context );
229232
230233 FlinkUnboundedSource <T > unboundedSource =
231234 FlinkSource .unbounded (
@@ -274,10 +277,7 @@ private static <T> void translateBoundedSource(
274277 TypeInformation <WindowedValue <T >> outputTypeInfo = context .getTypeInfo (output );
275278
276279 String fullName = getCurrentTransformName (context );
277- int parallelism =
278- context .getExecutionEnvironment ().getMaxParallelism () > 0
279- ? context .getExecutionEnvironment ().getMaxParallelism ()
280- : context .getExecutionEnvironment ().getParallelism ();
280+ int parallelism = getSourceParallelism (context );
281281
282282 FlinkBoundedSource <T > flinkBoundedSource =
283283 FlinkSource .bounded (
0 commit comments