Skip to content

[Bug]: Latest.Globally() issues in global window #35934

@damccorm

Description

@damccorm

What happened?

I was trying to repro an issue with slowly updating side inputs, and I found that the following code only logs one element:

pipeline = beam.Pipeline(options=options)

from apache_beam.transforms import core


start_timestamp = time.time() # start timestamp of the periodic impulse
main_input_fire_interval = 60 # interval in seconds at which the main input PCollection is emitted.
side_input_fire_interval = 60 # interval in seconds at which the side input PCollection is emitted.

def add_timestamp(element, timestamp=core.DoFn.TimestampParam):
  return [(element, timestamp)]

side_input = (
      pipeline
      | "SideInputImpulse" >> PeriodicImpulse(
          start_timestamp=start_timestamp,
          fire_interval=main_input_fire_interval)
      | "Window" >> WindowInto( 
          GlobalWindows(), 
          # Define the trigger. Since the Global Window never  # closes, we must tell the runner WHEN to emit a result. This trigger 
          # fires repeatedly for every new element that arrives.
          trigger=Repeatedly(AfterCount(1)), 
          # Define the accumulation mode. DISCARDING tells the  # runner to forget old values after a trigger fires. This prevents  # state from growing indefinitely and ensures Latest.Globally() only 
          # considers the most recent element. 
          accumulation_mode=AccumulationMode.DISCARDING, 
          ) 
      | 'Add timestamp' >> core.ParDo(add_timestamp)
)

side_input | "LocalLatest" >> Latest.Globally() | "show latest" >> beam.Map(logging.info)

However, if I change Latest.Globally() to Latest.Globally().without_defaults() it repeatedly yields a stream of elements as expected based on the trigger. I observed this on Dataflow (this pattern isn't supported on Prism, and I haven't validated it on other runners). As best I can tell, it is getting hung up on side inputs.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions