Skip to content

Commit 81bb506

Browse files
authored
[Java] Dataflow runner v1 - Propagate drain mode (#36534)
1 parent ead63ad commit 81bb506

33 files changed

+195
-26
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
389389
serializedWorkItemSize,
390390
watermarks,
391391
processingContext,
392+
drainMode,
392393
getWorkStreamLatencies) ->
393394
computationStateCache
394395
.get(processingContext.computationId())
@@ -401,6 +402,7 @@ private StreamingWorkerHarnessFactoryOutput createFanOutStreamingEngineWorkerHar
401402
serializedWorkItemSize,
402403
watermarks,
403404
processingContext,
405+
drainMode,
404406
getWorkStreamLatencies);
405407
}),
406408
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ public boolean workIsFailed() {
196196
return work != null && work.isFailed();
197197
}
198198

199+
public boolean getDrainMode() {
200+
return work != null ? work.getDrainMode() : false;
201+
}
202+
199203
public boolean offsetBasedDeduplicationSupported() {
200204
return activeReader != null
201205
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
@@ -820,7 +824,10 @@ public <W extends BoundedWindow> TimerData getNextFiredTimer(Coder<W> windowCode
820824
.transform(
821825
timer ->
822826
WindmillTimerInternals.windmillTimerToTimerData(
823-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder))
827+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
828+
timer,
829+
windowCoder,
830+
getDrainMode()))
824831
.iterator();
825832
}
826833

@@ -880,7 +887,10 @@ public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> window
880887
.transform(
881888
timer ->
882889
WindmillTimerInternals.windmillTimerToTimerData(
883-
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
890+
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
891+
timer,
892+
windowCoder,
893+
getDrainMode()))
884894
.iterator());
885895
}
886896

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.io.InputStream;
2525
import java.util.Collection;
2626
import java.util.Map;
27+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
2728
import org.apache.beam.runners.dataflow.util.CloudObject;
2829
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
2930
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -117,8 +118,16 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
117118
Collection<? extends BoundedWindow> windows =
118119
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
119120
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
121+
/**
122+
* https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry induced by
123+
* drain happened upstream
124+
*/
125+
boolean drainingValueFromUpstream = false;
120126
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
121-
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
127+
BeamFnApi.Elements.ElementMetadata elementMetadata =
128+
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
129+
drainingValueFromUpstream =
130+
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING;
122131
}
123132
if (valueCoder instanceof KvCoder) {
124133
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -128,12 +137,18 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
128137
@SuppressWarnings("unchecked")
129138
T result =
130139
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
131-
// todo #33176 propagate metadata to windowed value
132-
return WindowedValues.of(result, timestampMillis, windows, paneInfo);
140+
return WindowedValues.of(
141+
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
133142
} else {
134143
notifyElementRead(data.available() + metadata.available());
135-
// todo #33176 propagate metadata to windowed value
136-
return WindowedValues.of(decode(valueCoder, data), timestampMillis, windows, paneInfo);
144+
return WindowedValues.of(
145+
decode(valueCoder, data),
146+
timestampMillis,
147+
windows,
148+
paneInfo,
149+
null,
150+
null,
151+
drainingValueFromUpstream);
137152
}
138153
}
139154

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Collection;
2525
import java.util.List;
2626
import java.util.Objects;
27+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
2728
import org.apache.beam.runners.core.KeyedWorkItem;
2829
import org.apache.beam.runners.core.KeyedWorkItemCoder;
2930
import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -60,6 +61,8 @@ public class WindmillKeyedWorkItem<K, ElemT> implements KeyedWorkItem<K, ElemT>
6061

6162
private final Windmill.WorkItem workItem;
6263
private final K key;
64+
// used to inform that timer was caused by drain
65+
private final boolean drainMode;
6366

6467
private final transient Coder<? extends BoundedWindow> windowCoder;
6568
private final transient Coder<Collection<? extends BoundedWindow>> windowsCoder;
@@ -70,12 +73,14 @@ public WindmillKeyedWorkItem(
7073
Windmill.WorkItem workItem,
7174
Coder<? extends BoundedWindow> windowCoder,
7275
Coder<Collection<? extends BoundedWindow>> windowsCoder,
73-
Coder<ElemT> valueCoder) {
76+
Coder<ElemT> valueCoder,
77+
boolean drainMode) {
7478
this.key = key;
7579
this.workItem = workItem;
7680
this.windowCoder = windowCoder;
7781
this.windowsCoder = windowsCoder;
7882
this.valueCoder = valueCoder;
83+
this.drainMode = drainMode;
7984
}
8085

8186
@Override
@@ -93,7 +98,10 @@ public Iterable<TimerData> timersIterable() {
9398
.transform(
9499
timer ->
95100
WindmillTimerInternals.windmillTimerToTimerData(
96-
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, windowCoder));
101+
WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
102+
timer,
103+
windowCoder,
104+
drainMode));
97105
}
98106

99107
@Override
@@ -108,13 +116,21 @@ public Iterable<WindowedValue<ElemT>> elementsIterable() {
108116
Collection<? extends BoundedWindow> windows =
109117
WindmillSink.decodeMetadataWindows(windowsCoder, message.getMetadata());
110118
PaneInfo paneInfo = WindmillSink.decodeMetadataPane(message.getMetadata());
119+
/**
120+
* https://s.apache.org/beam-drain-mode - propagate drain bit if aggregation/expiry
121+
* induced by drain happened upstream
122+
*/
123+
boolean drainingValueFromUpstream = false;
111124
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
112-
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
125+
BeamFnApi.Elements.ElementMetadata elementMetadata =
126+
WindmillSink.decodeAdditionalMetadata(windowsCoder, message.getMetadata());
127+
drainingValueFromUpstream =
128+
elementMetadata.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING;
113129
}
114130
InputStream inputStream = message.getData().newInput();
115131
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
116-
// todo #33176 specify additional metadata in the future
117-
return WindowedValues.of(value, timestamp, windows, paneInfo);
132+
return WindowedValues.of(
133+
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
118134
} catch (IOException e) {
119135
throw new RuntimeException(e);
120136
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,10 @@ static Timer timerDataToWindmillTimer(
301301
}
302302

303303
public static TimerData windmillTimerToTimerData(
304-
WindmillNamespacePrefix prefix, Timer timer, Coder<? extends BoundedWindow> windowCoder) {
304+
WindmillNamespacePrefix prefix,
305+
Timer timer,
306+
Coder<? extends BoundedWindow> windowCoder,
307+
boolean draining) {
305308

306309
// The tag is a path-structure string but cheaper to parse than a proper URI. It follows
307310
// this pattern, where no component but the ID can contain a slash
@@ -395,6 +398,8 @@ public static TimerData windmillTimerToTimerData(
395398
timestamp,
396399
outputTimestamp,
397400
timerTypeToTimeDomain(timer.getType()));
401+
// todo add draining (https://github.com/apache/beam/issues/36884)
402+
398403
}
399404

400405
private static boolean useNewTimerTagEncoding(TimerData timerData) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
119119
final K key = keyCoder.decode(context.getSerializedKey().newInput(), Coder.Context.OUTER);
120120
final WorkItem workItem = context.getWorkItem();
121121
KeyedWorkItem<K, T> keyedWorkItem =
122-
new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, valueCoder);
122+
new WindmillKeyedWorkItem<>(
123+
key, workItem, windowCoder, windowsCoder, valueCoder, context.getDrainMode());
123124
final boolean isEmptyWorkItem =
124125
(Iterables.isEmpty(keyedWorkItem.timersIterable())
125126
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,22 @@ public final class Work implements RefreshableWork {
7878
private volatile TimedState currentState;
7979
private volatile boolean isFailed;
8080
private volatile String processingThreadName = "";
81+
private final boolean drainMode;
8182

8283
private Work(
8384
WorkItem workItem,
8485
long serializedWorkItemSize,
8586
Watermarks watermarks,
8687
ProcessingContext processingContext,
88+
boolean drainMode,
8789
Supplier<Instant> clock) {
8890
this.shardedKey = ShardedKey.create(workItem.getKey(), workItem.getShardingKey());
8991
this.workItem = workItem;
9092
this.serializedWorkItemSize = serializedWorkItemSize;
9193
this.processingContext = processingContext;
9294
this.watermarks = watermarks;
9395
this.clock = clock;
96+
this.drainMode = drainMode;
9497
this.startTime = clock.get();
9598
Preconditions.checkState(EMPTY_ENUM_MAP.isEmpty());
9699
// Create by passing EMPTY_ENUM_MAP to avoid recreating
@@ -110,8 +113,10 @@ public static Work create(
110113
long serializedWorkItemSize,
111114
Watermarks watermarks,
112115
ProcessingContext processingContext,
116+
boolean drainMode,
113117
Supplier<Instant> clock) {
114-
return new Work(workItem, serializedWorkItemSize, watermarks, processingContext, clock);
118+
return new Work(
119+
workItem, serializedWorkItemSize, watermarks, processingContext, drainMode, clock);
115120
}
116121

117122
public static ProcessingContext createProcessingContext(
@@ -207,6 +212,10 @@ public State getState() {
207212
return currentState.state();
208213
}
209214

215+
public boolean getDrainMode() {
216+
return drainMode;
217+
}
218+
210219
public void setState(State state) {
211220
Instant now = clock.get();
212221
totalDurationPerState.compute(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ private void streamingEngineDispatchLoop(
155155
(computationId,
156156
inputDataWatermark,
157157
synchronizedProcessingTime,
158+
drainMode,
158159
workItem,
159160
serializedWorkItemSize,
160161
getWorkStreamLatencies) ->
@@ -178,6 +179,7 @@ private void streamingEngineDispatchLoop(
178179
getDataClient,
179180
workCommitter::commit,
180181
heartbeatSender),
182+
drainMode,
181183
getWorkStreamLatencies);
182184
}));
183185
try {
@@ -239,6 +241,7 @@ private void applianceDispatchLoop(Supplier<Windmill.GetWorkResponse> getWorkFn)
239241
watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(),
240242
Work.createProcessingContext(
241243
computationId, getDataClient, workCommitter::commit, heartbeatSender),
244+
computationWork.getDrainMode(),
242245
/* getWorkStreamLatencies= */ ImmutableList.of());
243246
}
244247
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,17 @@ private static ComputationMetadata fromProto(
124124
metadataProto.getComputationId(),
125125
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
126126
WindmillTimeUtils.windmillToHarnessWatermark(
127-
metadataProto.getDependentRealtimeInputWatermark()));
127+
metadataProto.getDependentRealtimeInputWatermark()),
128+
metadataProto.getDrainMode());
128129
}
129130

130131
abstract String computationId();
131132

132133
abstract @Nullable Instant inputDataWatermark();
133134

134135
abstract @Nullable Instant synchronizedProcessingTime();
136+
137+
abstract boolean drainMode();
135138
}
136139

137140
@AutoValue

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ private void consumeAssembledWorkItem(AssembledWorkItem assembledWorkItem) {
281281
assembledWorkItem.bufferedSize(),
282282
createWatermarks(workItem, metadata),
283283
createProcessingContext(metadata.computationId()),
284+
metadata.drainMode(),
284285
assembledWorkItem.latencyAttributions());
285286
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
286287
GetWorkBudget extension = budgetTracker.computeBudgetExtension();

0 commit comments

Comments
 (0)