-
Notifications
You must be signed in to change notification settings - Fork 7
Asset tag bulk management #790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
e35a003
6b430f5
9c11ff6
e1185ba
d9c322a
f357ad2
b56abb0
187280d
cb5d8e1
4625aee
f82b97b
a629029
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -675,6 +675,54 @@ def process_direct_api_response( | |
|
|
||
|
|
||
| class Save: | ||
| @staticmethod | ||
| def _process_tags_by_semantic(asset: Asset) -> Asset: | ||
| """ | ||
| Process tags in an asset by moving them to the appropriate lists | ||
| based on their semantic value. | ||
|
|
||
| :param asset: asset to process | ||
| :returns: processed asset | ||
| """ | ||
| # Lists to collect tags by semantic | ||
| append_tags = [] | ||
| remove_tags = [] | ||
| replace_tags = [] | ||
|
|
||
| # Process atlan_tags | ||
| if hasattr(asset, "atlan_tags") and asset.atlan_tags: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 CONSIDER ✅ CORRECTNESS Confidence: 80.0% Issue: Edge case handling gap: The code processes tags from atlan_tags that have semantic values, but doesn't validate if having semantic tags in atlan_tags is the intended usage pattern according to business requirements. Additionally, repeated hasattr() checks for 'semantic' attribute create code duplication. Recommendation: Clarify business requirements: Should tags with explicit semantics be allowed in the generic atlan_tags field, or should they only be used through the specific classification fields? Extract a helper function like _get_tag_semantic(tag) -> Optional[SaveSemantic] to centralize semantic attribute access. |
||
| for tag in asset.atlan_tags: | ||
| if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.APPEND: | ||
| append_tags.append(tag) | ||
| elif hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE: | ||
| remove_tags.append(tag) | ||
| else: | ||
| # REPLACE or None | ||
| replace_tags.append(tag) | ||
|
|
||
| # Process add_or_update_classifications | ||
| if ( | ||
| hasattr(asset, "add_or_update_classifications") | ||
| and asset.add_or_update_classifications | ||
| ): | ||
| for tag in asset.add_or_update_classifications: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🚨 CRITICAL ✅ CORRECTNESS Confidence: 95.0% Issue: Business logic violation: Tags with REPLACE semantic in add_or_update_classifications are incorrectly processed as APPEND operations. This breaks the semantic meaning where REPLACE should replace all tags on the asset, not append to existing ones. Recommendation: Add explicit handling for SaveSemantic.REPLACE in add_or_update_classifications processing. REPLACE semantic tags should be moved to replace_tags list, not append_tags. |
||
| if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE: | ||
| remove_tags.append(tag) | ||
| else: | ||
| # APPEND or None - both go to add_or_update | ||
| append_tags.append(tag) | ||
|
|
||
| # Process remove_classifications | ||
| if hasattr(asset, "remove_classifications") and asset.remove_classifications: | ||
| remove_tags.extend(asset.remove_classifications) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confidence: 85.0% Issue: Potential semantic inconsistency: Tags in remove_classifications are processed without checking their semantic field. If a tag has APPEND or REPLACE semantic but is placed in remove_classifications, it will still be removed, potentially violating user intent. Recommendation: Add semantic validation for tags in remove_classifications. Consider warning or error when tags have conflicting semantics vs their placement in specific classification fields. |
||
|
|
||
| # Set the processed tags back on the asset | ||
| asset.atlan_tags = replace_tags if replace_tags else None | ||
| asset.add_or_update_classifications = append_tags if append_tags else None | ||
| asset.remove_classifications = remove_tags if remove_tags else None | ||
|
|
||
| return asset | ||
|
|
||
| @staticmethod | ||
| def prepare_request( | ||
| entity: Union[Asset, List[Asset]], | ||
|
|
@@ -695,13 +743,6 @@ def prepare_request( | |
| :param client: the Atlan client instance for flushing custom metadata | ||
| :returns: tuple of (query_params, bulk_request) | ||
| """ | ||
| query_params = { | ||
| "replaceTags": replace_atlan_tags, | ||
| "appendTags": append_atlan_tags, | ||
| "replaceBusinessAttributes": replace_custom_metadata, | ||
| "overwriteBusinessAttributes": overwrite_custom_metadata, | ||
| } | ||
|
|
||
| entities: List[Asset] = [] | ||
| if isinstance(entity, list): | ||
| entities.extend(entity) | ||
|
|
@@ -712,9 +753,45 @@ def prepare_request( | |
| raise ValueError( | ||
| "AtlanClient instance must be provided to validate and flush cm for assets." | ||
| ) | ||
| # Validate and flush entities BEFORE creating the BulkRequest | ||
|
|
||
| # Validate and flush entities BEFORE processing | ||
| Save.validate_and_flush_entities(entities, client) | ||
| return query_params, BulkRequest[Asset](entities=entities) | ||
|
|
||
| # Process tags by semantic for each asset | ||
| processed_entities = [] | ||
| has_replace_semantic = False | ||
| has_append_or_remove_semantic = False | ||
|
|
||
| for asset in entities: | ||
| processed_asset = Save._process_tags_by_semantic(asset) | ||
| processed_entities.append(processed_asset) | ||
|
|
||
| # Check if any tags have REPLACE semantic (will be in atlan_tags) | ||
| if processed_asset.atlan_tags: | ||
| has_replace_semantic = True | ||
|
|
||
| # Check if any tags have APPEND/REMOVE semantic | ||
| if ( | ||
| processed_asset.add_or_update_classifications | ||
| or processed_asset.remove_classifications | ||
| ): | ||
| has_append_or_remove_semantic = True | ||
|
|
||
| # Build query parameters based on semantic usage | ||
| query_params = { | ||
| "replaceBusinessAttributes": replace_custom_metadata, | ||
| "overwriteBusinessAttributes": overwrite_custom_metadata, | ||
| } | ||
|
|
||
| # When REPLACE semantic is used, use replaceClassifications | ||
| if has_replace_semantic or replace_atlan_tags: | ||
| query_params["replaceClassifications"] = True | ||
| query_params["appendTags"] = False | ||
| # When APPEND/REMOVE semantic is used, use appendTags | ||
| elif has_append_or_remove_semantic or append_atlan_tags: | ||
| query_params["appendTags"] = True | ||
|
|
||
| return query_params, BulkRequest[Asset](entities=processed_entities) | ||
|
|
||
| @staticmethod | ||
| async def prepare_request_async( | ||
|
|
@@ -736,13 +813,6 @@ async def prepare_request_async( | |
| :param client: Optional[AsyncAtlanClient] = None, | ||
| :returns: tuple of (query_params, bulk_request) | ||
| """ | ||
| query_params = { | ||
| "replaceTags": replace_atlan_tags, | ||
| "appendTags": append_atlan_tags, | ||
| "replaceBusinessAttributes": replace_custom_metadata, | ||
| "overwriteBusinessAttributes": overwrite_custom_metadata, | ||
| } | ||
|
|
||
| entities: List[Asset] = [] | ||
| if isinstance(entity, list): | ||
| entities.extend(entity) | ||
|
|
@@ -753,9 +823,45 @@ async def prepare_request_async( | |
| raise ValueError( | ||
| "AsyncAtlanClient instance must be provided to validate and flush cm for assets." | ||
| ) | ||
| # Validate and flush entities BEFORE creating the BulkRequest | ||
|
|
||
| # Validate and flush entities BEFORE processing | ||
| await Save.validate_and_flush_entities_async(entities, client) | ||
| return query_params, BulkRequest[Asset](entities=entities) | ||
|
|
||
| # Process tags by semantic for each asset | ||
| processed_entities = [] | ||
| has_replace_semantic = False | ||
| has_append_or_remove_semantic = False | ||
|
|
||
| for asset in entities: | ||
| processed_asset = Save._process_tags_by_semantic(asset) | ||
| processed_entities.append(processed_asset) | ||
|
|
||
| # Check if any tags have REPLACE semantic (will be in atlan_tags) | ||
| if processed_asset.atlan_tags: | ||
| has_replace_semantic = True | ||
|
|
||
| # Check if any tags have APPEND/REMOVE semantic | ||
| if ( | ||
| processed_asset.add_or_update_classifications | ||
| or processed_asset.remove_classifications | ||
| ): | ||
| has_append_or_remove_semantic = True | ||
|
|
||
| # Build query parameters based on semantic usage | ||
| query_params = { | ||
| "replaceBusinessAttributes": replace_custom_metadata, | ||
| "overwriteBusinessAttributes": overwrite_custom_metadata, | ||
| } | ||
|
|
||
| # When REPLACE semantic is used, use replaceClassifications | ||
| if has_replace_semantic or replace_atlan_tags: | ||
| query_params["replaceClassifications"] = True | ||
| query_params["appendTags"] = False | ||
| # When APPEND/REMOVE semantic is used, use appendTags | ||
| elif has_append_or_remove_semantic or append_atlan_tags: | ||
| query_params["appendTags"] = True | ||
|
|
||
| return query_params, BulkRequest[Asset](entities=processed_entities) | ||
|
|
||
| @staticmethod | ||
| def validate_and_flush_entities(entities: List[Asset], client: AtlanClient) -> None: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -323,6 +323,17 @@ class Config: | |
| source_tag_attachments: List[SourceTagAttachment] = Field( | ||
| default_factory=list, exclude=True | ||
| ) | ||
| semantic: Optional[SaveSemantic] = Field( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confidence: 75.0% Issue: Documentation inconsistency: The field description states REPLACE is 'default behavior' but the actual default value is None, which gets treated as REPLACE in processing logic. This could confuse users about the true default behavior. Additionally, the semantic field documentation could be more concise and focused. Recommendation: Update documentation to clarify that None semantic is treated as REPLACE behavior, or consider making SaveSemantic.REPLACE the actual default value instead of None. Consider shortening the description while maintaining clarity, potentially moving detailed examples to class-level docstring. |
||
| default=None, | ||
| exclude=True, | ||
| description=( | ||
| "Semantic for how this Atlan tag should be saved, " | ||
| "if used in an asset request on which `.save()` is called. " | ||
| "APPEND: add the tag if it doesn't exist, or update it if it does. " | ||
| "REMOVE: remove the tag if it exists. " | ||
| "REPLACE: replace all existing tags on the asset (default behavior)." | ||
| ), | ||
| ) | ||
|
|
||
| attributes: Optional[Dict[str, Any]] = None | ||
| tag_id: Optional[str] = Field(default=None, exclude=True) | ||
|
|
@@ -334,6 +345,7 @@ def of( | |
| entity_guid: Optional[str] = None, | ||
| source_tag_attachment: Optional[SourceTagAttachment] = None, | ||
| client: Optional[AtlanClient] = None, | ||
| semantic: Optional[SaveSemantic] = None, | ||
| ) -> AtlanTag: | ||
| """ | ||
| Construct an Atlan tag assignment for a specific entity. | ||
|
|
@@ -342,10 +354,11 @@ def of( | |
| :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned | ||
| :param source_tag_attachment: (optional) source-specific details for the tag | ||
| :param client: (optional) client instance used for translating source-specific details | ||
| :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) | ||
| :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment | ||
| :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified | ||
| """ | ||
| tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg] | ||
| tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg] | ||
| if entity_guid: | ||
| tag.entity_guid = entity_guid | ||
| tag.entity_status = EntityStatus.ACTIVE | ||
|
|
@@ -367,6 +380,7 @@ async def of_async( | |
| entity_guid: Optional[str] = None, | ||
| source_tag_attachment: Optional[SourceTagAttachment] = None, | ||
| client: Optional[AsyncAtlanClient] = None, | ||
| semantic: Optional[SaveSemantic] = None, | ||
| ) -> AtlanTag: | ||
| """ | ||
| Async version of AtlanTag.of() for use with AsyncAtlanClient. | ||
|
|
@@ -377,10 +391,11 @@ async def of_async( | |
| :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned | ||
| :param source_tag_attachment: (optional) source-specific details for the tag | ||
| :param client: (optional) async client instance used for translating source-specific details | ||
| :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) | ||
| :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment | ||
| :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified | ||
| """ | ||
| tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg] | ||
| tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg] | ||
| if entity_guid: | ||
| tag.entity_guid = entity_guid | ||
| tag.entity_status = EntityStatus.ACTIVE | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1 @@ | ||
| 8.4.5 | ||
| 8.4.6 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confidence: 90.0%
Issue: The _process_tags_by_semantic method has high cyclomatic complexity with nested conditions and multiple hasattr checks. This reduces code readability and testability.
Recommendation: Extract helper methods for tag processing logic: _process_atlan_tags(), _process_add_or_update_classifications(), and _process_remove_classifications() to improve readability and testability