Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
## 8.4.6 (January 13, 2025)

### New Features

- Added support for OAuth client management (`create`, `get`, `get_by_id`, `update`, `purge`).
- Added optional `field_type` parameter to `SearchableField.has_any_value()`, allowing users to handle cases when text exceeds **5K characters** (the keyword field on an attribute can be empty while the text field on the same attribute is populated).

### QOL Improvements

- Regenerated models with latest typedef definitions.
- Published `pyatlan` with Chainguard golden image for each new release.

## 8.4.5 (December 15, 2025)

### QOL Improvements
Expand Down
142 changes: 124 additions & 18 deletions pyatlan/client/common/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,54 @@ def process_direct_api_response(


class Save:
@staticmethod
def _process_tags_by_semantic(asset: Asset) -> Asset:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ IMPORTANT 🔧 MAINTAINABILITY

Confidence: 90.0%

Issue: The _process_tags_by_semantic method has high cyclomatic complexity with nested conditions and multiple hasattr checks. This reduces code readability and testability.

Recommendation: Extract helper methods for tag processing logic: _process_atlan_tags(), _process_add_or_update_classifications(), and _process_remove_classifications() to improve readability and testability

"""
Process tags in an asset by moving them to the appropriate lists
based on their semantic value.

:param asset: asset to process
:returns: processed asset
"""
# Lists to collect tags by semantic
append_tags = []
remove_tags = []
replace_tags = []

# Process atlan_tags
if hasattr(asset, "atlan_tags") and asset.atlan_tags:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 CONSIDERCORRECTNESS

Confidence: 80.0%

Issue: Edge case handling gap: The code processes tags from atlan_tags that have semantic values, but doesn't validate if having semantic tags in atlan_tags is the intended usage pattern according to business requirements. Additionally, repeated hasattr() checks for 'semantic' attribute create code duplication.

Recommendation: Clarify business requirements: Should tags with explicit semantics be allowed in the generic atlan_tags field, or should they only be used through the specific classification fields? Extract a helper function like _get_tag_semantic(tag) -> Optional[SaveSemantic] to centralize semantic attribute access.

for tag in asset.atlan_tags:
if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.APPEND:
append_tags.append(tag)
elif hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE:
remove_tags.append(tag)
else:
# REPLACE or None
replace_tags.append(tag)

# Process add_or_update_classifications
if (
hasattr(asset, "add_or_update_classifications")
and asset.add_or_update_classifications
):
for tag in asset.add_or_update_classifications:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 CRITICALCORRECTNESS

Confidence: 95.0%

Issue: Business logic violation: Tags with REPLACE semantic in add_or_update_classifications are incorrectly processed as APPEND operations. This breaks the semantic meaning where REPLACE should replace all tags on the asset, not append to existing ones.

Recommendation: Add explicit handling for SaveSemantic.REPLACE in add_or_update_classifications processing. REPLACE semantic tags should be moved to replace_tags list, not append_tags.

if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE:
remove_tags.append(tag)
else:
# APPEND or None - both go to add_or_update
append_tags.append(tag)

# Process remove_classifications
if hasattr(asset, "remove_classifications") and asset.remove_classifications:
remove_tags.extend(asset.remove_classifications)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ IMPORTANTCORRECTNESS

Confidence: 85.0%

Issue: Potential semantic inconsistency: Tags in remove_classifications are processed without checking their semantic field. If a tag has APPEND or REPLACE semantic but is placed in remove_classifications, it will still be removed, potentially violating user intent.

Recommendation: Add semantic validation for tags in remove_classifications. Consider warning or error when tags have conflicting semantics vs their placement in specific classification fields.


# Set the processed tags back on the asset
asset.atlan_tags = replace_tags if replace_tags else None
asset.add_or_update_classifications = append_tags if append_tags else None
asset.remove_classifications = remove_tags if remove_tags else None

return asset

@staticmethod
def prepare_request(
entity: Union[Asset, List[Asset]],
Expand All @@ -695,13 +743,6 @@ def prepare_request(
:param client: the Atlan client instance for flushing custom metadata
:returns: tuple of (query_params, bulk_request)
"""
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
Expand All @@ -712,9 +753,45 @@ def prepare_request(
raise ValueError(
"AtlanClient instance must be provided to validate and flush cm for assets."
)
# Validate and flush entities BEFORE creating the BulkRequest

# Validate and flush entities BEFORE processing
Save.validate_and_flush_entities(entities, client)
return query_params, BulkRequest[Asset](entities=entities)

# Process tags by semantic for each asset
processed_entities = []
has_replace_semantic = False
has_append_or_remove_semantic = False

for asset in entities:
processed_asset = Save._process_tags_by_semantic(asset)
processed_entities.append(processed_asset)

# Check if any tags have REPLACE semantic (will be in atlan_tags)
if processed_asset.atlan_tags:
has_replace_semantic = True

# Check if any tags have APPEND/REMOVE semantic
if (
processed_asset.add_or_update_classifications
or processed_asset.remove_classifications
):
has_append_or_remove_semantic = True

# Build query parameters based on semantic usage
query_params = {
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

# When REPLACE semantic is used, use replaceClassifications
if has_replace_semantic or replace_atlan_tags:
query_params["replaceClassifications"] = True
query_params["appendTags"] = False
# When APPEND/REMOVE semantic is used, use appendTags
elif has_append_or_remove_semantic or append_atlan_tags:
query_params["appendTags"] = True

return query_params, BulkRequest[Asset](entities=processed_entities)

@staticmethod
async def prepare_request_async(
Expand All @@ -736,13 +813,6 @@ async def prepare_request_async(
:param client: Optional[AsyncAtlanClient] = None,
:returns: tuple of (query_params, bulk_request)
"""
query_params = {
"replaceTags": replace_atlan_tags,
"appendTags": append_atlan_tags,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
Expand All @@ -753,9 +823,45 @@ async def prepare_request_async(
raise ValueError(
"AsyncAtlanClient instance must be provided to validate and flush cm for assets."
)
# Validate and flush entities BEFORE creating the BulkRequest

# Validate and flush entities BEFORE processing
await Save.validate_and_flush_entities_async(entities, client)
return query_params, BulkRequest[Asset](entities=entities)

# Process tags by semantic for each asset
processed_entities = []
has_replace_semantic = False
has_append_or_remove_semantic = False

for asset in entities:
processed_asset = Save._process_tags_by_semantic(asset)
processed_entities.append(processed_asset)

# Check if any tags have REPLACE semantic (will be in atlan_tags)
if processed_asset.atlan_tags:
has_replace_semantic = True

# Check if any tags have APPEND/REMOVE semantic
if (
processed_asset.add_or_update_classifications
or processed_asset.remove_classifications
):
has_append_or_remove_semantic = True

# Build query parameters based on semantic usage
query_params = {
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}

# When REPLACE semantic is used, use replaceClassifications
if has_replace_semantic or replace_atlan_tags:
query_params["replaceClassifications"] = True
query_params["appendTags"] = False
# When APPEND/REMOVE semantic is used, use appendTags
elif has_append_or_remove_semantic or append_atlan_tags:
query_params["appendTags"] = True

return query_params, BulkRequest[Asset](entities=processed_entities)

@staticmethod
def validate_and_flush_entities(entities: List[Asset], client: AtlanClient) -> None:
Expand Down
19 changes: 17 additions & 2 deletions pyatlan/model/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,17 @@ class Config:
source_tag_attachments: List[SourceTagAttachment] = Field(
default_factory=list, exclude=True
)
semantic: Optional[SaveSemantic] = Field(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ IMPORTANTCORRECTNESS

Confidence: 75.0%

Issue: Documentation inconsistency: The field description states REPLACE is 'default behavior' but the actual default value is None, which gets treated as REPLACE in processing logic. This could confuse users about the true default behavior. Additionally, the semantic field documentation could be more concise and focused.

Recommendation: Update documentation to clarify that None semantic is treated as REPLACE behavior, or consider making SaveSemantic.REPLACE the actual default value instead of None. Consider shortening the description while maintaining clarity, potentially moving detailed examples to class-level docstring.

default=None,
exclude=True,
description=(
"Semantic for how this Atlan tag should be saved, "
"if used in an asset request on which `.save()` is called. "
"APPEND: add the tag if it doesn't exist, or update it if it does. "
"REMOVE: remove the tag if it exists. "
"REPLACE: replace all existing tags on the asset (default behavior)."
),
)

attributes: Optional[Dict[str, Any]] = None
tag_id: Optional[str] = Field(default=None, exclude=True)
Expand All @@ -334,6 +345,7 @@ def of(
entity_guid: Optional[str] = None,
source_tag_attachment: Optional[SourceTagAttachment] = None,
client: Optional[AtlanClient] = None,
semantic: Optional[SaveSemantic] = None,
) -> AtlanTag:
"""
Construct an Atlan tag assignment for a specific entity.
Expand All @@ -342,10 +354,11 @@ def of(
:param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned
:param source_tag_attachment: (optional) source-specific details for the tag
:param client: (optional) client instance used for translating source-specific details
:param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE)
:return: an Atlan tag assignment with default settings for propagation and a specific entity assignment
:raises InvalidRequestError: if client is not provided and source_tag_attachment is specified
"""
tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg]
tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg]
if entity_guid:
tag.entity_guid = entity_guid
tag.entity_status = EntityStatus.ACTIVE
Expand All @@ -367,6 +380,7 @@ async def of_async(
entity_guid: Optional[str] = None,
source_tag_attachment: Optional[SourceTagAttachment] = None,
client: Optional[AsyncAtlanClient] = None,
semantic: Optional[SaveSemantic] = None,
) -> AtlanTag:
"""
Async version of AtlanTag.of() for use with AsyncAtlanClient.
Expand All @@ -377,10 +391,11 @@ async def of_async(
:param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned
:param source_tag_attachment: (optional) source-specific details for the tag
:param client: (optional) async client instance used for translating source-specific details
:param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE)
:return: an Atlan tag assignment with default settings for propagation and a specific entity assignment
:raises InvalidRequestError: if client is not provided and source_tag_attachment is specified
"""
tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg]
tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg]
if entity_guid:
tag.entity_guid = entity_guid
tag.entity_status = EntityStatus.ACTIVE
Expand Down
2 changes: 1 addition & 1 deletion pyatlan/version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.4.5
8.4.6
Loading
Loading