Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions pyatlan/client/aio/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,12 @@ async def save(
"""
Async save method - creates or updates assets based on qualified_name.

When using AtlanTag with semantic values:
- APPEND: adds/updates the tag using addOrUpdateClassifications
- REMOVE: removes the tag using removeClassifications
- REPLACE: replaces all tags on the asset
- None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
Expand All @@ -352,7 +358,22 @@ async def save(
:raises AtlanError: on any API communication issue
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
"""
# Convert entity to list for consistent handling
entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
else:
entities.append(entity)

# Check if any entity has tags with semantic
if Save.has_tags_with_semantic(entities):
return await self._save_with_tag_semantic(
entities=entities,
replace_custom_metadata=replace_custom_metadata,
overwrite_custom_metadata=overwrite_custom_metadata,
)

# Use existing logic for backward compatibility
query_params, request = await Save.prepare_request_async(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
Expand All @@ -367,6 +388,92 @@ async def save(
await self._wait_for_connections_to_be_created(connections_created)
return response

async def _save_with_tag_semantic(
self,
entities: List[Asset],
replace_custom_metadata: bool = False,
overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
"""
Internal async method to handle saving assets with tag semantic values.
Splits entities based on their tag semantics and makes appropriate API calls.

:param entities: list of assets to save
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
:returns: merged AssetMutationResponse from all API calls
"""
# Split entities by tag semantic
append_remove_entities, replace_entities, no_semantic_entities = (
Save.split_entities_by_tag_semantic(entities)
)

responses: List[AssetMutationResponse] = []

# Handle APPEND/REMOVE semantic entities
if append_remove_entities:
# Process each entity to set add_or_update_classifications and remove_classifications
for entity in append_remove_entities:
Save.process_asset_for_append_remove_semantic(entity)

# Validate and flush custom metadata
await Save.validate_and_flush_entities_async(
append_remove_entities,
self._client, # type: ignore[arg-type]
)

query_params = {
"replaceTags": False,
"appendTags": True,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=append_remove_entities)
raw_json = await self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Handle REPLACE semantic entities
if replace_entities:
# Validate and flush custom metadata
await Save.validate_and_flush_entities_async(replace_entities, self._client) # type: ignore[arg-type]

query_params = {
"replaceTags": True,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=replace_entities)
raw_json = await self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Handle no semantic entities (existing logic)
if no_semantic_entities:
# Validate and flush custom metadata
await Save.validate_and_flush_entities_async(
no_semantic_entities,
self._client, # type: ignore[arg-type]
)

query_params = {
"replaceTags": False,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=no_semantic_entities)
raw_json = await self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Merge all responses
merged_response = Save.merge_responses(responses)

# Handle connection waiting for any created connections
if connections_created := merged_response.assets_created(Connection):
await self._wait_for_connections_to_be_created(connections_created)

return merged_response

async def _wait_for_connections_to_be_created(self, connections_created):
guids = Save.get_connection_guids_to_wait_for(connections_created)

Expand Down
104 changes: 103 additions & 1 deletion pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
Table,
View,
)
from pyatlan.model.core import Announcement, AtlanObject, SearchRequest
from pyatlan.model.core import Announcement, AtlanObject, BulkRequest, SearchRequest
from pyatlan.model.custom_metadata import CustomMetadataDict
from pyatlan.model.enums import (
AssetCreationHandling,
Expand Down Expand Up @@ -432,6 +432,12 @@ def save(
If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be
overwritten or merged depending on the options provided.

When using AtlanTag with semantic values:
- APPEND: adds/updates the tag using addOrUpdateClassifications
- REMOVE: removes the tag using removeClassifications
- REPLACE: replaces all tags on the asset
- None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags

:param entity: one or more assets to save
:param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False)
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
Expand All @@ -441,6 +447,22 @@ def save(
:raises AtlanError: on any API communication issue
:raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit
"""
# Convert entity to list for consistent handling
entities: List[Asset] = []
if isinstance(entity, list):
entities.extend(entity)
else:
entities.append(entity)

# Check if any entity has tags with semantic
if Save.has_tags_with_semantic(entities):
return self._save_with_tag_semantic(
entities=entities,
replace_custom_metadata=replace_custom_metadata,
overwrite_custom_metadata=overwrite_custom_metadata,
)

# Use existing logic for backward compatibility
query_params, request = Save.prepare_request(
entity=entity,
replace_atlan_tags=replace_atlan_tags,
Expand All @@ -455,6 +477,86 @@ def save(
self._wait_for_connections_to_be_created(connections_created)
return response

def _save_with_tag_semantic(
self,
entities: List[Asset],
replace_custom_metadata: bool = False,
overwrite_custom_metadata: bool = False,
) -> AssetMutationResponse:
"""
Internal method to handle saving assets with tag semantic values.
Splits entities based on their tag semantics and makes appropriate API calls.

:param entities: list of assets to save
:param replace_custom_metadata: replaces any custom metadata with non-empty values provided
:param overwrite_custom_metadata: overwrites any custom metadata, even with empty values
:returns: merged AssetMutationResponse from all API calls
"""
# Split entities by tag semantic
append_remove_entities, replace_entities, no_semantic_entities = (
Save.split_entities_by_tag_semantic(entities)
)

responses: List[AssetMutationResponse] = []

# Handle APPEND/REMOVE semantic entities
if append_remove_entities:
# Process each entity to set add_or_update_classifications and remove_classifications
for entity in append_remove_entities:
Save.process_asset_for_append_remove_semantic(entity)

# Validate and flush custom metadata
Save.validate_and_flush_entities(append_remove_entities, self._client) # type: ignore[arg-type]

query_params = {
"replaceTags": False,
"appendTags": True,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=append_remove_entities)
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Handle REPLACE semantic entities
if replace_entities:
# Validate and flush custom metadata
Save.validate_and_flush_entities(replace_entities, self._client) # type: ignore[arg-type]

query_params = {
"replaceTags": True,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=replace_entities)
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Handle no semantic entities (existing logic)
if no_semantic_entities:
# Validate and flush custom metadata
Save.validate_and_flush_entities(no_semantic_entities, self._client) # type: ignore[arg-type]

query_params = {
"replaceTags": False,
"appendTags": False,
"replaceBusinessAttributes": replace_custom_metadata,
"overwriteBusinessAttributes": overwrite_custom_metadata,
}
request = BulkRequest[Asset](entities=no_semantic_entities)
raw_json = self._client._call_api(BULK_UPDATE, query_params, request)
responses.append(Save.process_response(raw_json))

# Merge all responses
merged_response = Save.merge_responses(responses)

# Handle connection waiting for any created connections
if connections_created := merged_response.assets_created(Connection):
self._wait_for_connections_to_be_created(connections_created)

return merged_response

def _wait_for_connections_to_be_created(self, connections_created):
guids = Save.get_connection_guids_to_wait_for(connections_created)

Expand Down
Loading
Loading