-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Description
What would you like to happen?
Current lineage implementation
Currently, the lineage implementation is tightly coupled to Beam Metrics as the storage backend in both Java and Python SDK.
The Java static API returns two instances for source and sink:
Lineage sources = Lineage.getSources();
Lineage sinks = Lineage.getSinks();The Lineage instance provides different overloads of the add() method, which sends lineage data to metrics:
if (MetricsFlag.lineageRollupEnabled()) {
((BoundedTrie) this.metric).add(segments);
} else {
((StringSet) this.metric).add(String.join("", segments));
}Also Lineage class provides two public static methods to query Lineage from results:
Set<String> query(MetricResults results, Type type, String truncatedMarker)
Set<String> query(MetricResults results, Type type)Requested functionality
Add a pluggable lineage tracking mechanism using ServiceLoader to decouple lineage reporting from core metrics infrastructure, enabling flexible observability without core changes. Scope: Java SDK (Python SDK in future work).
- This change must preserve current public APIs
- Minimize changes to I/O connectors that produce lineage; isolate changes to the
org.apache.beam.sdk.metrics.Lineageclass - Use a plugin approach via ServiceLoader discovery, following the existing pattern of
FileSystemRegistrar. Key advantage is that registrars on classpath can readPipelineOptionsand turn on or turn off depending on parameters. This satisfies the approach described in the Open Lineage ticket:
options = PipelineOptions([
'--openlineage_enabled=true',
- Do not provide any concrete plugin implementations. If no plugins are available, fall back to the existing metric-based lineage approach.
- Unfortunately static
querymethods exposeMetricResultsas implementation detail. Leave them as is, so they are out of scope of this change.
Relationship to existing roadmap
This change will serve as a foundation for [Feature Request]: Integrate Apache Beam with
Open Lineage which is already put on the roadmap for Beam 3.0.
Testing strategy
- Unit tests for the plugin discovery mechanism
- Integration tests with mock lineage reporters
- Backward compatibility tests ensuring existing metric-based lineage still works when no plugins are present
- Cross-runner tests will initially focus on DirectRunner, with cross-runner compatibility expected to be inherited from the existing metrics infrastructure
Documentation
- Update JavaDoc for
Lineageclass - Add developer guide for implementing custom lineage reporters
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner