feat(taps): Introduced method in Tap class to convert input catalog to Stream objects, without discovery#3324
feat(taps): Introduced method in Tap class to convert input catalog to Stream objects, without discovery#3324edgarrmondragon wants to merge 3 commits intomainfrom
Tap class to convert input catalog to Stream objects, without discovery#3324Conversation
Reviewer's GuideThis PR refactors stream loading in Tap to distinguish between discovery and catalog-driven workflows by introducing a new load_streams_from_catalog method and branching logic in streams(), adds a helper to check stream selection in Catalog, and updates TYPE_CHECKING imports for Iterable. Sequence diagram for Tap.streams() branching logicsequenceDiagram
participant Tap
participant Catalog
participant Stream
Tap->>Tap: streams()
alt input_catalog is None
Tap->>Tap: load_streams()
Tap->>Stream: create Stream objects
Tap->>Tap: store streams by name
else input_catalog is provided
Tap->>Tap: load_streams_from_catalog(input_catalog)
Tap->>Stream: create Stream objects from catalog
Tap->>Stream: apply_catalog(input_catalog)
Tap->>Tap: store streams by name
end
Class diagram for updated Tap class and CatalogclassDiagram
class Tap {
+streams: dict[str, Stream]
+load_streams() list[Stream]
+load_streams_from_catalog(catalog: Catalog) Iterable[Stream]
}
class Catalog {
+get_stream(stream_id: str) CatalogEntry | None
+has_stream_selected(stream_id: str) bool
}
Tap --> Catalog : uses
Catalog --> CatalogEntry : returns
Class diagram for new has_stream_selected method in CatalogclassDiagram
class Catalog {
+has_stream_selected(stream_id: str) bool
}
class CatalogEntry {
+metadata
}
Catalog --> CatalogEntry : get_stream(stream_id)
CatalogEntry --> Metadata : metadata
class Metadata {
+resolve_selection() mask
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Tap class to convert input catalog to Stream objects, without discoveryTap class to convert input catalog to Stream objects, without discovery
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `singer_sdk/tap_base.py:414-425` </location>
<code_context>
reverse=False,
)
+ def load_streams_from_catalog(self, catalog: Catalog) -> Iterable[Stream]: # noqa: ARG002
+ """Recreate streams from the input catalog.
+
+ Returns:
+ Iterable[Stream]: An iterable of Stream objects recreated from the input
+ catalog.
+ For backwards compatibility calls `self.load_streams()`
+
+ Returns:
+ A mapping of names to streams, using provided catalog.
+ """
+ return self.load_streams()
+
# Bookmarks and state management
</code_context>
<issue_to_address>
**suggestion:** The new method load_streams_from_catalog currently ignores the catalog argument.
Consider updating the implementation to use the catalog argument, or clarify in the docstring that it is currently unused.
```suggestion
def load_streams_from_catalog(self, catalog: Catalog) -> Iterable[Stream]: # noqa: ARG002
"""Recreate streams from the input catalog.
Note:
The `catalog` argument is currently unused. For backwards compatibility,
this method calls `self.load_streams()`.
Returns:
Iterable[Stream]: An iterable of Stream objects recreated from the input
catalog.
"""
return self.load_streams()
```
</issue_to_address>
### Comment 2
<location> `singer_sdk/tap_base.py:168-172` </location>
<code_context>
-
- for stream in self.load_streams():
- if input_catalog is not None:
+ if input_catalog is None:
+ self._streams = {stream.name: stream for stream in self.load_streams()}
+ else:
+ self._streams = {}
+ for stream in self.load_streams_from_catalog(input_catalog):
stream.apply_catalog(input_catalog)
- self._streams[stream.name] = stream
</code_context>
<issue_to_address>
**suggestion:** The logic for stream loading now depends on input_catalog, but load_streams_from_catalog currently delegates to load_streams.
If catalog-based filtering is intended, implement it in load_streams_from_catalog or clarify the expected override behavior in documentation.
</issue_to_address>
### Comment 3
<location> `singer_sdk/singerlib/catalog.py:436-445` </location>
<code_context>
"""
return self.get(stream_id)
+
+ def has_stream_selected(self, stream_id: str) -> bool:
+ """Returns true if the stream is selected.
+
+ Args:
+ stream_id: The ID/name of the stream.
+ """
+ if stream := self.get_stream(stream_id):
+ mask = stream.metadata.resolve_selection()
+ return mask[()]
+
+ return False
</code_context>
<issue_to_address>
**issue (bug_risk):** has_stream_selected may raise if resolve_selection returns a non-dict or mask is missing the expected key.
Add checks to confirm resolve_selection returns a dict and mask contains the expected key to prevent runtime errors from malformed input.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def has_stream_selected(self, stream_id: str) -> bool: | ||
| """Returns true if the stream is selected. | ||
|
|
||
| Args: | ||
| stream_id: The ID/name of the stream. | ||
| """ | ||
| if stream := self.get_stream(stream_id): | ||
| mask = stream.metadata.resolve_selection() | ||
| return mask[()] | ||
|
|
There was a problem hiding this comment.
issue (bug_risk): has_stream_selected may raise if resolve_selection returns a non-dict or mask is missing the expected key.
Add checks to confirm resolve_selection returns a dict and mask contains the expected key to prevent runtime errors from malformed input.
Documentation build overview
Show files changed (3 files in total): 📝 3 modified | ➕ 0 added | ➖ 0 deleted
|
2f61b57 to
923687d
Compare
CodSpeed Performance ReportMerging #3324 will not alter performanceComparing Summary
Footnotes |
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (66.66%) is below the target coverage (100.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3324 +/- ##
==========================================
- Coverage 93.78% 93.72% -0.07%
==========================================
Files 69 69
Lines 5758 5766 +8
Branches 713 714 +1
==========================================
+ Hits 5400 5404 +4
- Misses 254 258 +4
Partials 104 104
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ead4d7b to
3f07ba1
Compare
3f07ba1 to
4d6bcf6
Compare
…to `Stream` objects, without discovery This clarifies whether the tap should perform discovery or use the input catalog, the two being mutually exclusive. Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> Co-authored-by: Rafal Krupinski <low.map7277@fastmail.com> Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com> Co-authored-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com> Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
4d6bcf6 to
be2d51f
Compare
This clarifies whether the tap should perform discovery or use the input catalog, the two being mutually exclusive.
Supersedes #3084
Summary by Sourcery
Provide a clear separation between discovery and catalog-driven stream loading by adding load_streams_from_catalog and updating streams property, and enhance Catalog with a has_stream_selected helper
New Features:
Enhancements: