From 11e69b9bd512443b26e2fc9e9c45e368dddba0d1 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 17 Nov 2025 12:51:41 -0500 Subject: [PATCH] Capture element timestamp in DoFnRunners so they do not access mutable fields --- .../apache/beam/runners/core/SimpleDoFnRunner.java | 4 +++- .../org/apache/beam/fn/harness/FnApiDoFnRunner.java | 13 +++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 9cce1f71f2a1..2caa75bfa465 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -452,6 +452,8 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { + // Capturing here to eliminate mutable field access from lambda + Instant elemTimestamp = elem.getTimestamp(); builderSupplier .builder(output) .setTimestamp(timestamp) @@ -459,7 +461,7 @@ public void outputWindowedValue( .setPaneInfo(paneInfo) .setReceiver( wv -> { - checkTimestamp(elem.getTimestamp(), wv.getTimestamp()); + checkTimestamp(elemTimestamp, wv.getTimestamp()); SimpleDoFnRunner.this.outputWindowedValue(tag, wv); }) .output(); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 1b7d75f6ec32..28914eadb954 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -1582,10 +1582,10 @@ public org.apache.beam.sdk.state.Timer get(String dynamicTimerTag) { } @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected - private void checkTimestamp(Instant timestamp) { + private void checkTimestamp(Instant inputTimestamp, Instant timestamp) { Instant lowerBound; try { - lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew()); + lowerBound = inputTimestamp.minus(doFn.getAllowedTimestampSkew()); } catch (ArithmeticException e) { lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE; } @@ -1598,7 +1598,7 @@ private void checkTimestamp(Instant timestamp) { + "than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on " + "changing the allowed skew.", timestamp, - currentElement.getTimestamp(), + inputTimestamp, doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE ? doFn.getAllowedTimestampSkew() : PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()), @@ -1856,11 +1856,12 @@ private class NonWindowObservingProcessBundleContext @Override public OutputBuilder builder(OutputT value) { + Instant elemTimestamp = currentElement.getTimestamp(); return WindowedValues.builder(currentElement) .withValue(value) .setReceiver( windowedValue -> { - checkTimestamp(windowedValue.getTimestamp()); + checkTimestamp(elemTimestamp, windowedValue.getTimestamp()); outputTo(mainOutputConsumer, windowedValue); }); } @@ -1902,7 +1903,7 @@ public void outputWindowedValue( @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - checkTimestamp(timestamp); + checkTimestamp(currentElement.getTimestamp(), timestamp); FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) { @@ -1921,7 +1922,7 @@ public void outputWindowedValue( Instant timestamp, Collection windows, PaneInfo paneInfo) { - checkTimestamp(timestamp); + checkTimestamp(currentElement.getTimestamp(), timestamp); FnDataReceiver> consumer = (FnDataReceiver) localNameToConsumer.get(tag.getId()); if (consumer == null) {