-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Open
Description
What happened?
I was trying to repro an issue with slowly updating side inputs, and I found that the following code only logs one element:
pipeline = beam.Pipeline(options=options)
from apache_beam.transforms import core
start_timestamp = time.time() # start timestamp of the periodic impulse
main_input_fire_interval = 60 # interval in seconds at which the main input PCollection is emitted.
side_input_fire_interval = 60 # interval in seconds at which the side input PCollection is emitted.
def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
return [(element, timestamp)]
side_input = (
pipeline
| "SideInputImpulse" >> PeriodicImpulse(
start_timestamp=start_timestamp,
fire_interval=main_input_fire_interval)
| "Window" >> WindowInto(
GlobalWindows(),
# Define the trigger. Since the Global Window never # closes, we must tell the runner WHEN to emit a result. This trigger
# fires repeatedly for every new element that arrives.
trigger=Repeatedly(AfterCount(1)),
# Define the accumulation mode. DISCARDING tells the # runner to forget old values after a trigger fires. This prevents # state from growing indefinitely and ensures Latest.Globally() only
# considers the most recent element.
accumulation_mode=AccumulationMode.DISCARDING,
)
| 'Add timestamp' >> core.ParDo(add_timestamp)
)
side_input | "LocalLatest" >> Latest.Globally() | "show latest" >> beam.Map(logging.info)
However, if I change Latest.Globally() to Latest.Globally().without_defaults() it repeatedly yields a stream of elements as expected based on the trigger. I observed this on Dataflow (this pattern isn't supported on Prism, and I haven't validated it on other runners). As best I can tell, it is getting hung up on side inputs.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner