Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,16 @@ public <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
// Capturing here to eliminate mutable field access from lambda
Instant elemTimestamp = elem.getTimestamp();
builderSupplier
.builder(output)
.setTimestamp(timestamp)
.setWindows(windows)
.setPaneInfo(paneInfo)
.setReceiver(
wv -> {
checkTimestamp(elem.getTimestamp(), wv.getTimestamp());
checkTimestamp(elemTimestamp, wv.getTimestamp());
SimpleDoFnRunner.this.outputWindowedValue(tag, wv);
})
.output();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1582,10 +1582,10 @@ public org.apache.beam.sdk.state.Timer get(String dynamicTimerTag) {
}

@SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
private void checkTimestamp(Instant timestamp) {
private void checkTimestamp(Instant inputTimestamp, Instant timestamp) {
Instant lowerBound;
try {
lowerBound = currentElement.getTimestamp().minus(doFn.getAllowedTimestampSkew());
lowerBound = inputTimestamp.minus(doFn.getAllowedTimestampSkew());
} catch (ArithmeticException e) {
lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
}
Expand All @@ -1598,7 +1598,7 @@ private void checkTimestamp(Instant timestamp) {
+ "than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on "
+ "changing the allowed skew.",
timestamp,
currentElement.getTimestamp(),
inputTimestamp,
doFn.getAllowedTimestampSkew().getMillis() >= Integer.MAX_VALUE
? doFn.getAllowedTimestampSkew()
: PeriodFormat.getDefault().print(doFn.getAllowedTimestampSkew().toPeriod()),
Expand Down Expand Up @@ -1856,11 +1856,12 @@ private class NonWindowObservingProcessBundleContext

@Override
public OutputBuilder<OutputT> builder(OutputT value) {
Instant elemTimestamp = currentElement.getTimestamp();
return WindowedValues.builder(currentElement)
.withValue(value)
.setReceiver(
windowedValue -> {
checkTimestamp(windowedValue.getTimestamp());
checkTimestamp(elemTimestamp, windowedValue.getTimestamp());
outputTo(mainOutputConsumer, windowedValue);
});
}
Expand Down Expand Up @@ -1902,7 +1903,7 @@ public void outputWindowedValue(

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(timestamp);
checkTimestamp(currentElement.getTimestamp(), timestamp);
FnDataReceiver<WindowedValue<T>> consumer =
(FnDataReceiver) localNameToConsumer.get(tag.getId());
if (consumer == null) {
Expand All @@ -1921,7 +1922,7 @@ public <T> void outputWindowedValue(
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(timestamp);
checkTimestamp(currentElement.getTimestamp(), timestamp);
FnDataReceiver<WindowedValue<T>> consumer =
(FnDataReceiver) localNameToConsumer.get(tag.getId());
if (consumer == null) {
Expand Down
Loading