diff --git a/pyatlan/client/aio/asset.py b/pyatlan/client/aio/asset.py index eb6c582a5..470ad1032 100644 --- a/pyatlan/client/aio/asset.py +++ b/pyatlan/client/aio/asset.py @@ -343,6 +343,12 @@ async def save( """ Async save method - creates or updates assets based on qualified_name. + When using AtlanTag with semantic values: + - APPEND: adds/updates the tag using addOrUpdateClassifications + - REMOVE: removes the tag using removeClassifications + - REPLACE: replaces all tags on the asset + - None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags + :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update :param replace_custom_metadata: replaces any custom metadata with non-empty values provided @@ -352,7 +358,22 @@ async def save( :raises AtlanError: on any API communication issue :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit """ + # Convert entity to list for consistent handling + entities: List[Asset] = [] + if isinstance(entity, list): + entities.extend(entity) + else: + entities.append(entity) + + # Check if any entity has tags with semantic + if Save.has_tags_with_semantic(entities): + return await self._save_with_tag_semantic( + entities=entities, + replace_custom_metadata=replace_custom_metadata, + overwrite_custom_metadata=overwrite_custom_metadata, + ) + # Use existing logic for backward compatibility query_params, request = await Save.prepare_request_async( entity=entity, replace_atlan_tags=replace_atlan_tags, @@ -367,6 +388,92 @@ async def save( await self._wait_for_connections_to_be_created(connections_created) return response + async def _save_with_tag_semantic( + self, + entities: List[Asset], + replace_custom_metadata: bool = False, + overwrite_custom_metadata: bool = False, + ) -> AssetMutationResponse: + """ + Internal async method to handle saving assets with tag semantic values. + Splits entities based on their tag semantics and makes appropriate API calls. + + :param entities: list of assets to save + :param replace_custom_metadata: replaces any custom metadata with non-empty values provided + :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values + :returns: merged AssetMutationResponse from all API calls + """ + # Split entities by tag semantic + append_remove_entities, replace_entities, no_semantic_entities = ( + Save.split_entities_by_tag_semantic(entities) + ) + + responses: List[AssetMutationResponse] = [] + + # Handle APPEND/REMOVE semantic entities + if append_remove_entities: + # Process each entity to set add_or_update_classifications and remove_classifications + for entity in append_remove_entities: + Save.process_asset_for_append_remove_semantic(entity) + + # Validate and flush custom metadata + await Save.validate_and_flush_entities_async( + append_remove_entities, + self._client, # type: ignore[arg-type] + ) + + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=append_remove_entities) + raw_json = await self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Handle REPLACE semantic entities + if replace_entities: + # Validate and flush custom metadata + await Save.validate_and_flush_entities_async(replace_entities, self._client) # type: ignore[arg-type] + + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=replace_entities) + raw_json = await self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Handle no semantic entities (existing logic) + if no_semantic_entities: + # Validate and flush custom metadata + await Save.validate_and_flush_entities_async( + no_semantic_entities, + self._client, # type: ignore[arg-type] + ) + + query_params = { + "replaceTags": False, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=no_semantic_entities) + raw_json = await self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Merge all responses + merged_response = Save.merge_responses(responses) + + # Handle connection waiting for any created connections + if connections_created := merged_response.assets_created(Connection): + await self._wait_for_connections_to_be_created(connections_created) + + return merged_response + async def _wait_for_connections_to_be_created(self, connections_created): guids = Save.get_connection_guids_to_wait_for(connections_created) diff --git a/pyatlan/client/asset.py b/pyatlan/client/asset.py index cf7667e9a..1bd365a2b 100644 --- a/pyatlan/client/asset.py +++ b/pyatlan/client/asset.py @@ -99,7 +99,7 @@ Table, View, ) -from pyatlan.model.core import Announcement, AtlanObject, SearchRequest +from pyatlan.model.core import Announcement, AtlanObject, BulkRequest, SearchRequest from pyatlan.model.custom_metadata import CustomMetadataDict from pyatlan.model.enums import ( AssetCreationHandling, @@ -432,6 +432,12 @@ def save( If an asset does exist, opertionally overwrites any Atlan tags. Custom metadata will either be overwritten or merged depending on the options provided. + When using AtlanTag with semantic values: + - APPEND: adds/updates the tag using addOrUpdateClassifications + - REMOVE: removes the tag using removeClassifications + - REPLACE: replaces all tags on the asset + - None: uses existing logic based on replace_atlan_tags and append_atlan_tags flags + :param entity: one or more assets to save :param replace_atlan_tags: whether to replace AtlanTags during an update (True) or not (False) :param replace_custom_metadata: replaces any custom metadata with non-empty values provided @@ -441,6 +447,22 @@ def save( :raises AtlanError: on any API communication issue :raises ApiError: if a connection was created and blocking until policies are synced overruns the retry limit """ + # Convert entity to list for consistent handling + entities: List[Asset] = [] + if isinstance(entity, list): + entities.extend(entity) + else: + entities.append(entity) + + # Check if any entity has tags with semantic + if Save.has_tags_with_semantic(entities): + return self._save_with_tag_semantic( + entities=entities, + replace_custom_metadata=replace_custom_metadata, + overwrite_custom_metadata=overwrite_custom_metadata, + ) + + # Use existing logic for backward compatibility query_params, request = Save.prepare_request( entity=entity, replace_atlan_tags=replace_atlan_tags, @@ -455,6 +477,86 @@ def save( self._wait_for_connections_to_be_created(connections_created) return response + def _save_with_tag_semantic( + self, + entities: List[Asset], + replace_custom_metadata: bool = False, + overwrite_custom_metadata: bool = False, + ) -> AssetMutationResponse: + """ + Internal method to handle saving assets with tag semantic values. + Splits entities based on their tag semantics and makes appropriate API calls. + + :param entities: list of assets to save + :param replace_custom_metadata: replaces any custom metadata with non-empty values provided + :param overwrite_custom_metadata: overwrites any custom metadata, even with empty values + :returns: merged AssetMutationResponse from all API calls + """ + # Split entities by tag semantic + append_remove_entities, replace_entities, no_semantic_entities = ( + Save.split_entities_by_tag_semantic(entities) + ) + + responses: List[AssetMutationResponse] = [] + + # Handle APPEND/REMOVE semantic entities + if append_remove_entities: + # Process each entity to set add_or_update_classifications and remove_classifications + for entity in append_remove_entities: + Save.process_asset_for_append_remove_semantic(entity) + + # Validate and flush custom metadata + Save.validate_and_flush_entities(append_remove_entities, self._client) # type: ignore[arg-type] + + query_params = { + "replaceTags": False, + "appendTags": True, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=append_remove_entities) + raw_json = self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Handle REPLACE semantic entities + if replace_entities: + # Validate and flush custom metadata + Save.validate_and_flush_entities(replace_entities, self._client) # type: ignore[arg-type] + + query_params = { + "replaceTags": True, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=replace_entities) + raw_json = self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Handle no semantic entities (existing logic) + if no_semantic_entities: + # Validate and flush custom metadata + Save.validate_and_flush_entities(no_semantic_entities, self._client) # type: ignore[arg-type] + + query_params = { + "replaceTags": False, + "appendTags": False, + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + request = BulkRequest[Asset](entities=no_semantic_entities) + raw_json = self._client._call_api(BULK_UPDATE, query_params, request) + responses.append(Save.process_response(raw_json)) + + # Merge all responses + merged_response = Save.merge_responses(responses) + + # Handle connection waiting for any created connections + if connections_created := merged_response.assets_created(Connection): + self._wait_for_connections_to_be_created(connections_created) + + return merged_response + def _wait_for_connections_to_be_created(self, connections_created): guids = Save.get_connection_guids_to_wait_for(connections_created) diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index ca265a7e1..42275a86c 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -863,6 +863,182 @@ def process_response_replacing_cm( """ return AssetMutationResponse(**raw_json) + @staticmethod + def has_tags_with_semantic(entities: List[Asset]) -> bool: + """ + Check if any entity has atlan_tags with a semantic value set. + + :param entities: list of assets to check + :returns: True if any entity has tags with semantic set + """ + for entity in entities: + if entity.atlan_tags: + for tag in entity.atlan_tags: + if tag.semantic is not None: + return True + return False + + @staticmethod + def split_entities_by_tag_semantic( + entities: List[Asset], + ) -> tuple[List[Asset], List[Asset], List[Asset]]: + """ + Split entities into three groups based on their tag semantics: + 1. append_remove_entities: assets with APPEND or REMOVE semantic tags + 2. replace_entities: assets with REPLACE semantic tags + 3. no_semantic_entities: assets with no tags OR all tags have None semantic + + An asset goes into append_remove_entities if ANY of its tags have APPEND or REMOVE semantic. + An asset goes into replace_entities if ALL of its tags have REPLACE semantic (and none have APPEND/REMOVE). + An asset goes into no_semantic_entities if it has no tags or all tags have None semantic. + + :param entities: list of assets to split + :returns: tuple of (append_remove_entities, replace_entities, no_semantic_entities) + """ + append_remove_entities: List[Asset] = [] + replace_entities: List[Asset] = [] + no_semantic_entities: List[Asset] = [] + + for entity in entities: + if not entity.atlan_tags: + # No tags - use existing logic + no_semantic_entities.append(entity) + continue + + has_append_or_remove = False + has_replace = False + all_none = True + + for tag in entity.atlan_tags: + if ( + tag.semantic == SaveSemantic.APPEND + or tag.semantic == SaveSemantic.REMOVE + ): + has_append_or_remove = True + all_none = False + elif tag.semantic == SaveSemantic.REPLACE: + has_replace = True + all_none = False + # tag.semantic == None doesn't change any flags + + if all_none: + # All tags have None semantic - use existing logic + no_semantic_entities.append(entity) + elif has_append_or_remove: + # Has APPEND or REMOVE semantic tags + append_remove_entities.append(entity) + elif has_replace: + # Only has REPLACE semantic tags + replace_entities.append(entity) + else: + # Shouldn't reach here, but default to no_semantic + no_semantic_entities.append(entity) + + return append_remove_entities, replace_entities, no_semantic_entities + + @staticmethod + def process_asset_for_append_remove_semantic(entity: Asset) -> Asset: + """ + Process an asset with APPEND/REMOVE semantic tags. + Sets add_or_update_classifications for APPEND tags and + remove_classifications for REMOVE tags. + Clears atlan_tags after processing. + + :param entity: the asset to process + :returns: the processed asset + """ + if not entity.atlan_tags: + return entity + + append_tags: List[AtlanTag] = [] + remove_tags: List[AtlanTag] = [] + + for tag in entity.atlan_tags: + if tag.semantic == SaveSemantic.APPEND: + append_tags.append(tag) + elif tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + # Tags with None or REPLACE semantic are ignored here + # REPLACE semantic should not appear in append_remove_entities + + if append_tags: + entity.add_or_update_classifications = append_tags + if remove_tags: + entity.remove_classifications = remove_tags + + # Clear atlan_tags since we've moved them to add_or_update_classifications + # and remove_classifications + entity.atlan_tags = None + + return entity + + @staticmethod + def merge_responses( + responses: List[AssetMutationResponse], + ) -> AssetMutationResponse: + """ + Merge multiple AssetMutationResponse objects into a single response. + + :param responses: list of responses to merge + :returns: merged AssetMutationResponse + """ + from pyatlan.model.response import MutatedEntities + + if not responses: + return AssetMutationResponse() + + if len(responses) == 1: + return responses[0] + + merged_guid_assignments: Dict[str, str] = {} + merged_created: List[Asset] = [] + merged_updated: List[Asset] = [] + merged_deleted: List[Asset] = [] + merged_partial_updated: List[Asset] = [] + + for response in responses: + if response.guid_assignments: + merged_guid_assignments.update(response.guid_assignments) + if response.mutated_entities: + if response.mutated_entities.CREATE: + merged_created.extend(response.mutated_entities.CREATE) + if response.mutated_entities.UPDATE: + merged_updated.extend(response.mutated_entities.UPDATE) + if response.mutated_entities.DELETE: + merged_deleted.extend(response.mutated_entities.DELETE) + if response.mutated_entities.PARTIAL_UPDATE: + merged_partial_updated.extend( + response.mutated_entities.PARTIAL_UPDATE + ) + if response.partial_updated_entities: + merged_partial_updated.extend(response.partial_updated_entities) + + mutated_entities = MutatedEntities( + CREATE=merged_created if merged_created else None, + UPDATE=merged_updated if merged_updated else None, + DELETE=merged_deleted if merged_deleted else None, + ) + # Set PARTIAL_UPDATE separately due to alias conflict in MutatedEntities + if merged_partial_updated: + mutated_entities.PARTIAL_UPDATE = merged_partial_updated + + return AssetMutationResponse( + guid_assignments=merged_guid_assignments + if merged_guid_assignments + else None, + mutated_entities=mutated_entities + if ( + merged_created + or merged_updated + or merged_deleted + or merged_partial_updated + ) + else None, + partial_updated_entities=merged_partial_updated + if merged_partial_updated + else None, + ) + class UpdateAsset: @staticmethod diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 0a4b89da0..7adb38cda 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -326,6 +326,15 @@ class Config: attributes: Optional[Dict[str, Any]] = None tag_id: Optional[str] = Field(default=None, exclude=True) + semantic: Optional[SaveSemantic] = Field( + default=None, + exclude=True, + description=( + "Semantic for how this Atlan tag should be saved. " + "Use APPEND to add/update tags, REMOVE to remove tags, " + "or REPLACE to replace all tags on the asset." + ), + ) @classmethod def of( @@ -334,6 +343,7 @@ def of( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Construct an Atlan tag assignment for a specific entity. @@ -342,6 +352,7 @@ def of( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -358,6 +369,8 @@ def of( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic: + tag.semantic = semantic return tag @classmethod @@ -367,6 +380,7 @@ async def of_async( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AsyncAtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Async version of AtlanTag.of() for use with AsyncAtlanClient. @@ -377,6 +391,7 @@ async def of_async( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) async client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ @@ -393,6 +408,8 @@ async def of_async( ) tag.attributes = {source_tag_attr_id: [source_tag_attachment]} # type: ignore[dict-item] tag.source_tag_attachments.append(source_tag_attachment) + if semantic: + tag.semantic = semantic return tag diff --git a/pyatlan/model/retranslators.py b/pyatlan/model/retranslators.py index f7787b89e..4fcc4ad14 100644 --- a/pyatlan/model/retranslators.py +++ b/pyatlan/model/retranslators.py @@ -70,7 +70,7 @@ def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification human-readable name → hash ID for key in self._CLASSIFICATION_NAMES: - if key in data: + if key in data and data[key] is not None: data[key] = [ self.client.atlan_tag_cache.get_id_for_name(str(name)) or DELETED_ for name in data[key] @@ -78,7 +78,7 @@ def retranslate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects human-readable name typeName → hash ID for key in self._CLASSIFICATION_KEYS: - if key in data: + if key in data and data[key] is not None: for classification in data[key]: tag_name = str(classification.get(self._TYPE_NAME)) if tag_name: diff --git a/pyatlan/model/translators.py b/pyatlan/model/translators.py index 1583b25bd..d17975dfb 100644 --- a/pyatlan/model/translators.py +++ b/pyatlan/model/translators.py @@ -74,7 +74,7 @@ def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification hash ID → human-readable name for key in self._CLASSIFICATION_NAMES: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: raw_json[key] = [ self.client.atlan_tag_cache.get_name_for_id(tag_id) or DELETED_ for tag_id in raw_json[key] @@ -82,7 +82,7 @@ def translate(self, data: Dict[str, Any]) -> Dict[str, Any]: # Convert classification objects typeName hash ID → human-readable name for key in self._CLASSIFICATION_KEYS: - if key in raw_json: + if key in raw_json and raw_json[key] is not None: for classification in raw_json[key]: tag_id = classification.get(self._TYPE_NAME) if tag_id: diff --git a/tests/integration/test_save_semantic.py b/tests/integration/test_save_semantic.py new file mode 100644 index 000000000..12d16eb31 --- /dev/null +++ b/tests/integration/test_save_semantic.py @@ -0,0 +1,497 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. + +""" +Integration tests for SaveSemantic feature for in-bulk asset tag management. + +These tests verify APPEND, REMOVE, and REPLACE semantics work correctly +for managing Atlan tags on assets. +""" + +import logging +import time +from typing import Generator, List, NamedTuple, Set + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic +from pyatlan.model.fluent_search import FluentSearch + +# Table names to search for (in production Snowflake connection) +TABLE1_NAME = "BUYINGGROUPS" +TABLE2_NAME = "CITIES_ARCHIVE" + +# Tag names to use in tests +TAG_ISSUE = "Issue" +TAG_CONFIDENTIAL = "Confidential" + +LOGGER = logging.getLogger(__name__) + + +class TableInfo(NamedTuple): + """Container for table information.""" + + name: str + qualified_name: str + + +def find_table_by_name( + client: AtlanClient, table_name: str, connector_type: str = "snowflake" +) -> TableInfo: + """ + Find a table by name using FluentSearch, filtering by connector type. + + :param client: AtlanClient instance + :param table_name: name of the table to find + :param connector_type: connector type to filter by (default: snowflake) + :returns: TableInfo with name and qualified_name + :raises ValueError: if table not found + """ + results = ( + FluentSearch() + .where(FluentSearch.asset_type(Table)) + .where(Table.NAME.eq(table_name)) + .where(Table.QUALIFIED_NAME.wildcard(f"*/{connector_type}/*")) + .include_on_results(Table.NAME) + .include_on_results(Table.QUALIFIED_NAME) + .page_size(10) + ).execute(client) + + tables = list(results) + if not tables: + raise ValueError( + f"Table '{table_name}' not found in tenant " + f"(connector_type={connector_type})" + ) + + # If multiple tables found, log them and use the first one + if len(tables) > 1: + LOGGER.warning( + f"Multiple {connector_type} tables found with name '{table_name}'. " + f"Using first one. All qualified names: " + f"{[t.qualified_name for t in tables]}" + ) + + table = tables[0] + if not table.qualified_name: + raise ValueError(f"Table '{table_name}' has no qualified_name") + + LOGGER.info( + f"Found {connector_type} table '{table_name}' " + f"with qualified_name: {table.qualified_name}" + ) + return TableInfo(name=table_name, qualified_name=table.qualified_name) + + +def get_current_tags(client: AtlanClient, qualified_name: str) -> Set[str]: + """Helper to retrieve current tags on an asset.""" + asset = client.asset.get_by_qualified_name( + qualified_name=qualified_name, + asset_type=Table, + ) + return {str(t.type_name) for t in (asset.atlan_tags or [])} + + +def remove_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str, tag_names: List[str] +) -> None: + """Helper to remove specific tags from an asset.""" + if not tag_names: + return + table = Table.updater(qualified_name=qualified_name, name=name) + table.atlan_tags = [ + AtlanTag.of(atlan_tag_name=AtlanTagName(tag), semantic=SaveSemantic.REMOVE) + for tag in tag_names + ] + try: + client.asset.save(entity=table) + except Exception as e: + LOGGER.debug(f"Tag removal (may be expected if tag not present): {e}") + + +def remove_all_tags_from_asset( + client: AtlanClient, qualified_name: str, name: str +) -> None: + """Helper to remove ALL tags from an asset (complete cleanup).""" + try: + current_tags = get_current_tags(client, qualified_name) + if current_tags: + LOGGER.info(f"Removing all tags from {name}: {current_tags}") + remove_tags_from_asset(client, qualified_name, name, list(current_tags)) + except Exception as e: + LOGGER.debug(f"Complete tag removal (may be expected): {e}") + + +@pytest.fixture(scope="module") +def table1(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE1 (BUYINGGROUPS) dynamically.""" + return find_table_by_name(client, TABLE1_NAME) + + +@pytest.fixture(scope="module") +def table2(client: AtlanClient) -> TableInfo: + """Fixture to find TABLE2 (CITIES_ARCHIVE) dynamically.""" + return find_table_by_name(client, TABLE2_NAME) + + +@pytest.fixture(scope="module", autouse=True) +def complete_cleanup_before_tests( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Module-scoped fixture to do a COMPLETE cleanup of ALL tags before any tests run. + This ensures a clean slate at the start of the test module. + """ + LOGGER.info("Performing complete cleanup of ALL tags before tests...") + + # Remove ALL tags from both tables + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + time.sleep(3) + + # Verify cleanup was successful + for table_info in [table1, table2]: + tags = get_current_tags(client, table_info.qualified_name) + LOGGER.info(f"After complete cleanup - {table_info.name} tags: {tags}") + + yield + + # Final cleanup after all tests + LOGGER.info("Final cleanup after all tests...") + for table_info in [table1, table2]: + remove_all_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + ) + + +@pytest.fixture(autouse=True) +def cleanup_tags( + client: AtlanClient, table1: TableInfo, table2: TableInfo +) -> Generator[None, None, None]: + """ + Fixture to ensure test tags are removed before and after each test. + This maintains a clean state for testing. + """ + # Cleanup before test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + time.sleep(2) + + yield + + # Cleanup after test - remove our test tags + for table_info in [table1, table2]: + remove_tags_from_asset( + client, + table_info.qualified_name, + table_info.name, + [TAG_ISSUE, TAG_CONFIDENTIAL], + ) + + +@pytest.mark.order(1) +def test_append_single_tag( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add a single tag. + Expected: Tag is added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + assert response.mutated_entities is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"APPEND single tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_single_tag") +def test_append_multiple_tags( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test APPEND semantic - Add multiple tags in one request. + Expected: All tags are added to the asset. + """ + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + response = client.asset.save(entity=table) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL in tags + LOGGER.info(f"APPEND multiple tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_multiple_tags") +def test_remove_tag_after_append( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test REMOVE semantic - Add a tag, then remove it. + Expected: Tag is removed from the asset. + """ + # First add a tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Remove the tag + table_remove = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_remove.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_remove) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE not in tags + LOGGER.info(f"REMOVE tag test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_remove_tag_after_append") +def test_replace_all_tags( + client: AtlanClient, + table2: TableInfo, +) -> None: + """ + Test REPLACE semantic - Replace all tags with a new one. + Expected: Only the new tag remains on the asset. + """ + # First add Issue tag + table = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify tag was added + tags_after_add = get_current_tags(client, table2.qualified_name) + assert TAG_ISSUE in tags_after_add, "Setup failed: tag not added" + + # Replace with Confidential + table_replace = Table.updater( + qualified_name=table2.qualified_name, name=table2.name + ) + table_replace.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REPLACE, + ), + ] + response = client.asset.save(entity=table_replace) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags + assert TAG_ISSUE not in tags + LOGGER.info(f"REPLACE tags test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_replace_all_tags") +def test_append_and_remove_combined( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test mixed semantics - APPEND and REMOVE in same request. + Expected: Issue is added, Confidential is removed. + """ + # First add Confidential tag + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table) + time.sleep(3) + + # Verify setup + tags_after_add = get_current_tags(client, table1.qualified_name) + assert TAG_CONFIDENTIAL in tags_after_add, ( + "Setup failed: Confidential tag not added" + ) + + # APPEND Issue and REMOVE Confidential in same request + table_mixed = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table_mixed.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.REMOVE, + ), + ] + response = client.asset.save(entity=table_mixed) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + assert TAG_CONFIDENTIAL not in tags + LOGGER.info(f"Mixed APPEND/REMOVE test passed. Tags: {tags}") + + +@pytest.mark.order(after="test_append_and_remove_combined") +def test_bulk_save_with_mixed_semantics( + client: AtlanClient, + table1: TableInfo, + table2: TableInfo, +) -> None: + """ + Test bulk save with APPEND and REPLACE semantics across different assets. + - Table1: APPEND Issue and Confidential + - Table2: REPLACE with Issue only + Expected: Both tables updated correctly based on their semantics. + """ + # First add Confidential to Table2 to test REPLACE + table2_setup = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + table2_setup.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + client.asset.save(entity=table2_setup) + time.sleep(3) + + # Verify setup + tags_table2_setup = get_current_tags(client, table2.qualified_name) + assert TAG_CONFIDENTIAL in tags_table2_setup, ( + "Setup failed: Confidential not on Table2" + ) + + # Bulk save with different semantics + t1 = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + t1.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.APPEND, + ), + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_CONFIDENTIAL), + semantic=SaveSemantic.APPEND, + ), + ] + + t2 = Table.updater(qualified_name=table2.qualified_name, name=table2.name) + t2.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE), + semantic=SaveSemantic.REPLACE, + ), + ] + + response = client.asset.save(entity=[t1, t2]) + time.sleep(3) + + # Verify response + assert response is not None + + # Check Table1 - should have both tags (APPEND) + tags1 = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags1, f"Table1: Expected '{TAG_ISSUE}' in tags" + assert TAG_CONFIDENTIAL in tags1, f"Table1: Expected '{TAG_CONFIDENTIAL}' in tags" + + # Check Table2 - should only have Issue (REPLACE removed Confidential) + tags2 = get_current_tags(client, table2.qualified_name) + assert TAG_ISSUE in tags2, f"Table2: Expected '{TAG_ISSUE}' in tags" + assert TAG_CONFIDENTIAL not in tags2, ( + f"Table2: REPLACE should have removed '{TAG_CONFIDENTIAL}'" + ) + + LOGGER.info(f"Bulk save test passed. Table1 tags: {tags1}, Table2 tags: {tags2}") + + +@pytest.mark.order(after="test_bulk_save_with_mixed_semantics") +def test_backward_compatibility_no_semantic( + client: AtlanClient, + table1: TableInfo, +) -> None: + """ + Test backward compatibility - tags with no semantic (None) work as before. + Expected: Tag is added using existing behavior. + """ + # Use AtlanTag directly without semantic (backward compatible) + table = Table.updater(qualified_name=table1.qualified_name, name=table1.name) + table.atlan_tags = [ + AtlanTag.of( + atlan_tag_name=AtlanTagName(TAG_ISSUE) + ), # No semantic - backward compatible + ] + + response = client.asset.save(entity=table, replace_atlan_tags=True) + time.sleep(3) + + # Verify response and tags + assert response is not None + tags = get_current_tags(client, table1.qualified_name) + assert TAG_ISSUE in tags + LOGGER.info(f"Backward compatibility test passed. Tags: {tags}") diff --git a/tests/unit/test_save_semantic.py b/tests/unit/test_save_semantic.py new file mode 100644 index 000000000..5a6fbcc44 --- /dev/null +++ b/tests/unit/test_save_semantic.py @@ -0,0 +1,486 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2026 Atlan Pte. Ltd. + +""" +Unit tests for SaveSemantic feature for in-bulk asset tag management. + +Tests cover: +1. AtlanTag semantic field behavior +2. Entity splitting logic based on semantic values +3. Processing of APPEND/REMOVE tags +4. Response merging +""" + +from unittest.mock import MagicMock + +import pytest + +from pyatlan.client.common.asset import Save +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic + + +# ============================================================================= +# Test 1: AtlanTag semantic field defaults to None (backward compatibility) +# ============================================================================= +def test_atlan_tag_semantic_defaults_to_none(): + """Verify AtlanTag.semantic defaults to None for backward compatibility.""" + tag = AtlanTag(type_name=AtlanTagName("TestTag")) + assert tag.semantic is None + + +# ============================================================================= +# Test 2: AtlanTag.of() accepts semantic parameter +# ============================================================================= +def test_atlan_tag_of_with_semantic(): + """Verify AtlanTag.of() factory method accepts semantic parameter.""" + tag_append = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag1"), + semantic=SaveSemantic.APPEND, + ) + tag_remove = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag2"), + semantic=SaveSemantic.REMOVE, + ) + tag_replace = AtlanTag.of( + atlan_tag_name=AtlanTagName("Tag3"), + semantic=SaveSemantic.REPLACE, + ) + + assert tag_append.semantic == SaveSemantic.APPEND + assert tag_remove.semantic == SaveSemantic.REMOVE + assert tag_replace.semantic == SaveSemantic.REPLACE + + +# ============================================================================= +# Test 3: Semantic field is excluded from JSON serialization +# ============================================================================= +def test_atlan_tag_semantic_excluded_from_json(): + """Verify semantic field is excluded from JSON (not sent to API).""" + tag = AtlanTag(type_name=AtlanTagName("TestTag"), semantic=SaveSemantic.APPEND) + json_dict = tag.dict(by_alias=True, exclude_none=True) + assert "semantic" not in json_dict + + +# ============================================================================= +# Test 4: has_tags_with_semantic() detects semantic tags +# ============================================================================= +def test_has_tags_with_semantic_detection(): + """Verify has_tags_with_semantic correctly detects semantic vs non-semantic tags.""" + # Entity with APPEND semantic - should return True + table_append = Table() + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + assert Save.has_tags_with_semantic([table_append]) is True + + # Entity with no semantic (None) - should return False + table_none = Table() + table_none.atlan_tags = [AtlanTag(type_name=AtlanTagName("Tag2"))] + assert Save.has_tags_with_semantic([table_none]) is False + + # Entity with no tags - should return False + table_empty = Table() + table_empty.atlan_tags = None + assert Save.has_tags_with_semantic([table_empty]) is False + + +# ============================================================================= +# Test 5: Split entities - APPEND/REMOVE goes to append_remove bucket +# ============================================================================= +def test_split_entities_append_remove_bucket(): + """Verify APPEND and REMOVE semantic tags go to append_remove bucket.""" + table_append = Table() + table_append.qualified_name = "table_append" + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table_remove = Table() + table_remove.qualified_name = "table_remove" + table_remove.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REMOVE) + ] + + append_remove, replace, no_semantic = Save.split_entities_by_tag_semantic( + [table_append, table_remove] + ) + + assert len(append_remove) == 2 + assert len(replace) == 0 + assert len(no_semantic) == 0 + + +# ============================================================================= +# Test 6: Split entities - REPLACE goes to replace bucket (only if no APPEND/REMOVE) +# ============================================================================= +def test_split_entities_replace_bucket(): + """Verify REPLACE semantic tags go to replace bucket when no APPEND/REMOVE present.""" + table_replace = Table() + table_replace.qualified_name = "table_replace" + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + + append_remove, replace, no_semantic = Save.split_entities_by_tag_semantic( + [table_replace] + ) + + assert len(append_remove) == 0 + assert len(replace) == 1 + assert len(no_semantic) == 0 + + +# ============================================================================= +# Test 7: Split entities - None semantic goes to no_semantic bucket +# ============================================================================= +def test_split_entities_no_semantic_bucket(): + """Verify tags with None semantic (backward compatible) go to no_semantic bucket.""" + table_none = Table() + table_none.qualified_name = "table_none" + table_none.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1")) # semantic=None + ] + + table_no_tags = Table() + table_no_tags.qualified_name = "table_no_tags" + table_no_tags.atlan_tags = None + + append_remove, replace, no_semantic = Save.split_entities_by_tag_semantic( + [table_none, table_no_tags] + ) + + assert len(append_remove) == 0 + assert len(replace) == 0 + assert len(no_semantic) == 2 + + +# ============================================================================= +# Test 8: Split entities - Mixed semantics on same entity (APPEND takes priority) +# ============================================================================= +def test_split_entities_mixed_semantics_append_priority(): + """Verify entity with both APPEND and REPLACE goes to append_remove (APPEND priority).""" + table_mixed = Table() + table_mixed.qualified_name = "table_mixed" + table_mixed.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + ] + + append_remove, replace, no_semantic = Save.split_entities_by_tag_semantic( + [table_mixed] + ) + + # APPEND/REMOVE takes priority - entity goes to append_remove bucket + assert len(append_remove) == 1 + assert len(replace) == 0 + assert len(no_semantic) == 0 + + +# ============================================================================= +# Test 9: Process APPEND/REMOVE - tags moved to correct classification fields +# ============================================================================= +def test_process_asset_append_remove_semantic(): + """Verify APPEND tags go to add_or_update_classifications, REMOVE to remove_classifications.""" + table = Table() + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("RemoveTag"), semantic=SaveSemantic.REMOVE), + ] + + Save.process_asset_for_append_remove_semantic(table) + + # atlan_tags should be cleared + assert table.atlan_tags is None + # APPEND tag should be in add_or_update_classifications + assert table.add_or_update_classifications is not None + assert len(table.add_or_update_classifications) == 1 + # REMOVE tag should be in remove_classifications + assert table.remove_classifications is not None + assert len(table.remove_classifications) == 1 + + +# ============================================================================= +# Test 10: Merge responses combines multiple API responses correctly +# ============================================================================= +def test_merge_responses(): + """Verify merge_responses correctly combines multiple AssetMutationResponse objects.""" + from pyatlan.model.response import AssetMutationResponse, MutatedEntities + + table1 = Table() + table1.guid = "guid1" + table2 = Table() + table2.guid = "guid2" + + response1 = AssetMutationResponse( + guid_assignments={"temp1": "real1"}, + mutated_entities=MutatedEntities(CREATE=[table1]), + ) + response2 = AssetMutationResponse( + guid_assignments={"temp2": "real2"}, + mutated_entities=MutatedEntities(UPDATE=[table2]), + ) + + result = Save.merge_responses([response1, response2]) + + assert result is not None + assert result.guid_assignments == {"temp1": "real1", "temp2": "real2"} + assert result.mutated_entities is not None + assert result.mutated_entities.CREATE is not None + assert result.mutated_entities.UPDATE is not None + assert len(result.mutated_entities.CREATE) == 1 + assert len(result.mutated_entities.UPDATE) == 1 + + +# ============================================================================= +# API Call Count Tests - Verify correct number of API calls for different semantics +# ============================================================================= + + +def _create_mock_response(): + """Helper to create a mock API response JSON.""" + return { + "guidAssignments": {"temp": "real"}, + "mutatedEntities": { + "CREATE": [], + "UPDATE": [{"typeName": "Table", "guid": "test-guid"}], + }, + } + + +@pytest.fixture +def mock_asset_client(): + """Create a mock AssetClient for testing API call counts.""" + from pyatlan.client.asset import AssetClient + from pyatlan.client.common import ApiCaller + + mock_api_caller = MagicMock(spec=ApiCaller) + mock_api_caller._call_api = MagicMock(return_value=_create_mock_response()) + + # Create AssetClient with mocked api caller + client = AssetClient.__new__(AssetClient) + object.__setattr__(client, "_client", mock_api_caller) + + return client, mock_api_caller._call_api + + +# ============================================================================= +# Test 11: Single APPEND semantic = 1 API call +# ============================================================================= +def test_api_call_count_single_append(mock_asset_client): + """Verify single APPEND semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 12: Single REMOVE semantic = 1 API call +# ============================================================================= +def test_api_call_count_single_remove(mock_asset_client): + """Verify single REMOVE semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REMOVE) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 13: Single REPLACE semantic = 1 API call +# ============================================================================= +def test_api_call_count_single_replace(mock_asset_client): + """Verify single REPLACE semantic results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE) + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 14: APPEND + REMOVE combined on same entity = 1 API call +# ============================================================================= +def test_api_call_count_append_remove_combined(mock_asset_client): + """Verify APPEND and REMOVE on same entity results in 1 API call.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REMOVE), + ] + + client.save(entity=table) + + # APPEND and REMOVE go in same call (append_atlan_tags=True) + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 15: APPEND + REPLACE on different entities = 2 API calls +# ============================================================================= +def test_api_call_count_append_replace_different_entities(mock_asset_client): + """Verify APPEND and REPLACE on different entities = 2 API calls.""" + client, mock_call_api = mock_asset_client + + table1 = Table() + table1.qualified_name = "test/table1" + table1.name = "test_table1" + table1.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table2 = Table() + table2.qualified_name = "test/table2" + table2.name = "test_table2" + table2.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + client.save(entity=[table1, table2]) + + # APPEND and REPLACE require separate API calls + assert mock_call_api.call_count == 2 + + +# ============================================================================= +# Test 16: No semantic (None) = 1 API call (backward compatible) +# ============================================================================= +def test_api_call_count_no_semantic(mock_asset_client): + """Verify no semantic (None) results in 1 API call via existing path.""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1")) # No semantic + ] + + client.save(entity=table) + + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 17: APPEND + REPLACE + None semantic = 3 API calls +# ============================================================================= +def test_api_call_count_all_three_semantics(mock_asset_client): + """Verify APPEND, REPLACE, and None semantics on different entities = 3 API calls.""" + client, mock_call_api = mock_asset_client + + table_append = Table() + table_append.qualified_name = "test/table_append" + table_append.name = "table_append" + table_append.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND) + ] + + table_replace = Table() + table_replace.qualified_name = "test/table_replace" + table_replace.name = "table_replace" + table_replace.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE) + ] + + table_none = Table() + table_none.qualified_name = "test/table_none" + table_none.name = "table_none" + table_none.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag3")) # No semantic + ] + + client.save(entity=[table_append, table_replace, table_none]) + + # Three different semantic paths = 3 API calls + assert mock_call_api.call_count == 3 + + +# ============================================================================= +# Test 18: Multiple APPEND entities = 1 API call (batched together) +# ============================================================================= +def test_api_call_count_multiple_append_batched(mock_asset_client): + """Verify multiple APPEND entities are batched into 1 API call.""" + client, mock_call_api = mock_asset_client + + tables = [] + for i in range(5): + table = Table() + table.qualified_name = f"test/table_{i}" + table.name = f"table_{i}" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName(f"Tag{i}"), semantic=SaveSemantic.APPEND) + ] + tables.append(table) + + client.save(entity=tables) + + # All APPEND entities batched into single API call + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 19: Mixed APPEND/REMOVE/REPLACE on same entity - APPEND/REMOVE prioritized +# ============================================================================= +def test_api_call_count_mixed_on_same_entity(mock_asset_client): + """Verify entity with APPEND+REPLACE goes to append_remove bucket (1 call).""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = [ + AtlanTag(type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND), + AtlanTag(type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REPLACE), + AtlanTag(type_name=AtlanTagName("Tag3"), semantic=SaveSemantic.REMOVE), + ] + + client.save(entity=table) + + # APPEND/REMOVE takes priority, entity goes to append_remove bucket = 1 call + assert mock_call_api.call_count == 1 + + +# ============================================================================= +# Test 20: Asset without tags = 1 API call (no semantic processing) +# ============================================================================= +def test_api_call_count_asset_without_tags(mock_asset_client): + """Verify asset without tags results in 1 API call (standard save).""" + client, mock_call_api = mock_asset_client + + table = Table() + table.qualified_name = "test/table" + table.name = "test_table" + table.atlan_tags = None # No tags at all + + client.save(entity=table) + + # Standard save path = 1 API call + assert mock_call_api.call_count == 1