Skip to content

[Feature Request]: Make lineage tracking pluggable #36790

@shnapz

Description

@shnapz

What would you like to happen?

Current lineage implementation

Currently, the lineage implementation is tightly coupled to Beam Metrics as the storage backend in both Java and Python SDK.

The Java static API returns two instances for source and sink:

Lineage sources = Lineage.getSources();
Lineage sinks = Lineage.getSinks();

The Lineage instance provides different overloads of the add() method, which sends lineage data to metrics:

    if (MetricsFlag.lineageRollupEnabled()) {
      ((BoundedTrie) this.metric).add(segments);
    } else {
      ((StringSet) this.metric).add(String.join("", segments));
    }

Also Lineage class provides two public static methods to query Lineage from results:

Set<String> query(MetricResults results, Type type, String truncatedMarker)
Set<String> query(MetricResults results, Type type)

Requested functionality

Add a pluggable lineage tracking mechanism using ServiceLoader to decouple lineage reporting from core metrics infrastructure, enabling flexible observability without core changes. Scope: Java SDK (Python SDK in future work).

  1. This change must preserve current public APIs
  2. Minimize changes to I/O connectors that produce lineage; isolate changes to the org.apache.beam.sdk.metrics.Lineage class
  3. Use a plugin approach via ServiceLoader discovery, following the existing pattern of FileSystemRegistrar. Key advantage is that registrars on classpath can read PipelineOptions and turn on or turn off depending on parameters. This satisfies the approach described in the Open Lineage ticket:
options = PipelineOptions([
'--openlineage_enabled=true',
  1. Do not provide any concrete plugin implementations. If no plugins are available, fall back to the existing metric-based lineage approach.
  2. Unfortunately static query methods expose MetricResults as implementation detail. Leave them as is, so they are out of scope of this change.

Relationship to existing roadmap

This change will serve as a foundation for [Feature Request]: Integrate Apache Beam with
Open Lineage
which is already put on the roadmap for Beam 3.0.

Testing strategy

  • Unit tests for the plugin discovery mechanism
  • Integration tests with mock lineage reporters
  • Backward compatibility tests ensuring existing metric-based lineage still works when no plugins are present
  • Cross-runner tests will initially focus on DirectRunner, with cross-runner compatibility expected to be inherited from the existing metrics infrastructure

Documentation

  • Update JavaDoc for Lineage class
  • Add developer guide for implementing custom lineage reporters

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions