diff --git a/HISTORY.md b/HISTORY.md index c36e707da..b9af2229a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,15 @@ +## 8.4.6 (January 13, 2025) + +### New Features + +- Added support for OAuth client management (`create`, `get`, `get_by_id`, `update`, `purge`). +- Added optional `field_type` parameter to `SearchableField.has_any_value()`, allowing users to handle cases when text exceeds **5K characters** (the keyword field on an attribute can be empty while the text field on the same attribute is populated). + +### QOL Improvements + +- Regenerated models with latest typedef definitions. +- Published `pyatlan` with Chainguard golden image for each new release. + ## 8.4.5 (December 15, 2025) ### QOL Improvements diff --git a/pyatlan/client/common/asset.py b/pyatlan/client/common/asset.py index ca265a7e1..a49ce4330 100644 --- a/pyatlan/client/common/asset.py +++ b/pyatlan/client/common/asset.py @@ -675,6 +675,54 @@ def process_direct_api_response( class Save: + @staticmethod + def _process_tags_by_semantic(asset: Asset) -> Asset: + """ + Process tags in an asset by moving them to the appropriate lists + based on their semantic value. + + :param asset: asset to process + :returns: processed asset + """ + # Lists to collect tags by semantic + append_tags = [] + remove_tags = [] + replace_tags = [] + + # Process atlan_tags + if hasattr(asset, "atlan_tags") and asset.atlan_tags: + for tag in asset.atlan_tags: + if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.APPEND: + append_tags.append(tag) + elif hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + else: + # REPLACE or None + replace_tags.append(tag) + + # Process add_or_update_classifications + if ( + hasattr(asset, "add_or_update_classifications") + and asset.add_or_update_classifications + ): + for tag in asset.add_or_update_classifications: + if hasattr(tag, "semantic") and tag.semantic == SaveSemantic.REMOVE: + remove_tags.append(tag) + else: + # APPEND or None - both go to add_or_update + append_tags.append(tag) + + # Process remove_classifications + if hasattr(asset, "remove_classifications") and asset.remove_classifications: + remove_tags.extend(asset.remove_classifications) + + # Set the processed tags back on the asset + asset.atlan_tags = replace_tags if replace_tags else None + asset.add_or_update_classifications = append_tags if append_tags else None + asset.remove_classifications = remove_tags if remove_tags else None + + return asset + @staticmethod def prepare_request( entity: Union[Asset, List[Asset]], @@ -695,13 +743,6 @@ def prepare_request( :param client: the Atlan client instance for flushing custom metadata :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -712,9 +753,45 @@ def prepare_request( raise ValueError( "AtlanClient instance must be provided to validate and flush cm for assets." ) - # Validate and flush entities BEFORE creating the BulkRequest + + # Validate and flush entities BEFORE processing Save.validate_and_flush_entities(entities, client) - return query_params, BulkRequest[Asset](entities=entities) + + # Process tags by semantic for each asset + processed_entities = [] + has_replace_semantic = False + has_append_or_remove_semantic = False + + for asset in entities: + processed_asset = Save._process_tags_by_semantic(asset) + processed_entities.append(processed_asset) + + # Check if any tags have REPLACE semantic (will be in atlan_tags) + if processed_asset.atlan_tags: + has_replace_semantic = True + + # Check if any tags have APPEND/REMOVE semantic + if ( + processed_asset.add_or_update_classifications + or processed_asset.remove_classifications + ): + has_append_or_remove_semantic = True + + # Build query parameters based on semantic usage + query_params = { + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + + # When REPLACE semantic is used, use replaceClassifications + if has_replace_semantic or replace_atlan_tags: + query_params["replaceClassifications"] = True + query_params["appendTags"] = False + # When APPEND/REMOVE semantic is used, use appendTags + elif has_append_or_remove_semantic or append_atlan_tags: + query_params["appendTags"] = True + + return query_params, BulkRequest[Asset](entities=processed_entities) @staticmethod async def prepare_request_async( @@ -736,13 +813,6 @@ async def prepare_request_async( :param client: Optional[AsyncAtlanClient] = None, :returns: tuple of (query_params, bulk_request) """ - query_params = { - "replaceTags": replace_atlan_tags, - "appendTags": append_atlan_tags, - "replaceBusinessAttributes": replace_custom_metadata, - "overwriteBusinessAttributes": overwrite_custom_metadata, - } - entities: List[Asset] = [] if isinstance(entity, list): entities.extend(entity) @@ -753,9 +823,45 @@ async def prepare_request_async( raise ValueError( "AsyncAtlanClient instance must be provided to validate and flush cm for assets." ) - # Validate and flush entities BEFORE creating the BulkRequest + + # Validate and flush entities BEFORE processing await Save.validate_and_flush_entities_async(entities, client) - return query_params, BulkRequest[Asset](entities=entities) + + # Process tags by semantic for each asset + processed_entities = [] + has_replace_semantic = False + has_append_or_remove_semantic = False + + for asset in entities: + processed_asset = Save._process_tags_by_semantic(asset) + processed_entities.append(processed_asset) + + # Check if any tags have REPLACE semantic (will be in atlan_tags) + if processed_asset.atlan_tags: + has_replace_semantic = True + + # Check if any tags have APPEND/REMOVE semantic + if ( + processed_asset.add_or_update_classifications + or processed_asset.remove_classifications + ): + has_append_or_remove_semantic = True + + # Build query parameters based on semantic usage + query_params = { + "replaceBusinessAttributes": replace_custom_metadata, + "overwriteBusinessAttributes": overwrite_custom_metadata, + } + + # When REPLACE semantic is used, use replaceClassifications + if has_replace_semantic or replace_atlan_tags: + query_params["replaceClassifications"] = True + query_params["appendTags"] = False + # When APPEND/REMOVE semantic is used, use appendTags + elif has_append_or_remove_semantic or append_atlan_tags: + query_params["appendTags"] = True + + return query_params, BulkRequest[Asset](entities=processed_entities) @staticmethod def validate_and_flush_entities(entities: List[Asset], client: AtlanClient) -> None: diff --git a/pyatlan/model/core.py b/pyatlan/model/core.py index 0a4b89da0..e0ad49ea8 100644 --- a/pyatlan/model/core.py +++ b/pyatlan/model/core.py @@ -323,6 +323,17 @@ class Config: source_tag_attachments: List[SourceTagAttachment] = Field( default_factory=list, exclude=True ) + semantic: Optional[SaveSemantic] = Field( + default=None, + exclude=True, + description=( + "Semantic for how this Atlan tag should be saved, " + "if used in an asset request on which `.save()` is called. " + "APPEND: add the tag if it doesn't exist, or update it if it does. " + "REMOVE: remove the tag if it exists. " + "REPLACE: replace all existing tags on the asset (default behavior)." + ), + ) attributes: Optional[Dict[str, Any]] = None tag_id: Optional[str] = Field(default=None, exclude=True) @@ -334,6 +345,7 @@ def of( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Construct an Atlan tag assignment for a specific entity. @@ -342,10 +354,11 @@ def of( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ - tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg] + tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg] if entity_guid: tag.entity_guid = entity_guid tag.entity_status = EntityStatus.ACTIVE @@ -367,6 +380,7 @@ async def of_async( entity_guid: Optional[str] = None, source_tag_attachment: Optional[SourceTagAttachment] = None, client: Optional[AsyncAtlanClient] = None, + semantic: Optional[SaveSemantic] = None, ) -> AtlanTag: """ Async version of AtlanTag.of() for use with AsyncAtlanClient. @@ -377,10 +391,11 @@ async def of_async( :param entity_guid: unique identifier (GUID) of the entity to which the Atlan tag is to be assigned :param source_tag_attachment: (optional) source-specific details for the tag :param client: (optional) async client instance used for translating source-specific details + :param semantic: (optional) semantic for how this tag should be saved (APPEND, REMOVE, or REPLACE) :return: an Atlan tag assignment with default settings for propagation and a specific entity assignment :raises InvalidRequestError: if client is not provided and source_tag_attachment is specified """ - tag = AtlanTag(type_name=atlan_tag_name) # type: ignore[call-arg] + tag = AtlanTag(type_name=atlan_tag_name, semantic=semantic) # type: ignore[call-arg] if entity_guid: tag.entity_guid = entity_guid tag.entity_status = EntityStatus.ACTIVE diff --git a/pyatlan/version.txt b/pyatlan/version.txt index 81e08b593..2924409ea 100644 --- a/pyatlan/version.txt +++ b/pyatlan/version.txt @@ -1 +1 @@ -8.4.5 +8.4.6 diff --git a/tests/unit/test_tag_semantic.py b/tests/unit/test_tag_semantic.py new file mode 100644 index 000000000..4cf89b089 --- /dev/null +++ b/tests/unit/test_tag_semantic.py @@ -0,0 +1,391 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2025 Atlan Pte. Ltd. +from unittest.mock import MagicMock, patch + +import pytest + +from pyatlan.client.atlan import AtlanClient +from pyatlan.model.assets import Table +from pyatlan.model.core import AtlanTag, AtlanTagName +from pyatlan.model.enums import SaveSemantic + + +@pytest.fixture(autouse=True) +def set_env(monkeypatch): + monkeypatch.setenv("ATLAN_API_KEY", "test-api-key") + monkeypatch.setenv("ATLAN_BASE_URL", "https://test.atlan.com") + + +@pytest.fixture() +def client(): + return AtlanClient() + + +@pytest.fixture() +def mock_tag_cache(monkeypatch): + mock_cache = MagicMock() + mock_cache.get_id_for_name.return_value = "test-tag-id" + mock_cache.get_source_tags_attr_id.return_value = "sourceTagsAttrId" + # Patch at the class level + monkeypatch.setattr(AtlanClient, "atlan_tag_cache", mock_cache) + return mock_cache + + +class TestAtlanTagSemantic: + """Test suite for AtlanTag semantic field functionality.""" + + def test_atlan_tag_with_semantic_field(self): + """Test that AtlanTag can be created with semantic field.""" + tag = AtlanTag( + type_name=AtlanTagName("TestTag"), # type: ignore[call-arg] + semantic=SaveSemantic.APPEND, + ) + assert tag.semantic == SaveSemantic.APPEND + assert str(tag.type_name) == "TestTag" + + def test_atlan_tag_semantic_default_none(self): + """Test that semantic defaults to None.""" + tag = AtlanTag(type_name=AtlanTagName("TestTag")) # type: ignore[call-arg] + assert tag.semantic is None + + def test_atlan_tag_of_with_semantic_append(self, client, mock_tag_cache): + """Test AtlanTag.of() with APPEND semantic.""" + tag = AtlanTag.of( + atlan_tag_name=AtlanTagName("TestTag"), + entity_guid="test-guid-123", + semantic=SaveSemantic.APPEND, + client=client, + ) + assert tag.semantic == SaveSemantic.APPEND + assert tag.entity_guid == "test-guid-123" + assert str(tag.type_name) == "TestTag" + + def test_atlan_tag_of_with_semantic_remove(self, client, mock_tag_cache): + """Test AtlanTag.of() with REMOVE semantic.""" + tag = AtlanTag.of( + atlan_tag_name=AtlanTagName("TestTag"), + entity_guid="test-guid-456", + semantic=SaveSemantic.REMOVE, + client=client, + ) + assert tag.semantic == SaveSemantic.REMOVE + assert tag.entity_guid == "test-guid-456" + + def test_atlan_tag_of_with_semantic_replace(self, client, mock_tag_cache): + """Test AtlanTag.of() with REPLACE semantic (default behavior).""" + tag = AtlanTag.of( + atlan_tag_name=AtlanTagName("TestTag"), + entity_guid="test-guid-789", + semantic=SaveSemantic.REPLACE, + client=client, + ) + assert tag.semantic == SaveSemantic.REPLACE + assert tag.entity_guid == "test-guid-789" + + def test_atlan_tag_of_without_semantic(self, client, mock_tag_cache): + """Test AtlanTag.of() without semantic parameter (backward compatibility).""" + tag = AtlanTag.of( + atlan_tag_name=AtlanTagName("TestTag"), + entity_guid="test-guid-000", + client=client, + ) + assert tag.semantic is None + assert tag.entity_guid == "test-guid-000" + + @pytest.mark.asyncio + async def test_atlan_tag_of_async_with_semantic(self, client, mock_tag_cache): + """Test AtlanTag.of_async() with semantic parameter.""" + # Mock the async methods + mock_tag_cache.get_id_for_name = MagicMock(return_value="test-tag-id") + mock_tag_cache.get_source_tags_attr_id = MagicMock( + return_value="sourceTagsAttrId" + ) + + tag = await AtlanTag.of_async( + atlan_tag_name=AtlanTagName("TestTag"), + entity_guid="async-guid-123", + semantic=SaveSemantic.APPEND, + ) + assert tag.semantic == SaveSemantic.APPEND + assert tag.entity_guid == "async-guid-123" + + +class TestSaveTagSemanticProcessing: + """Test suite for Save._process_tags_by_semantic functionality.""" + + def test_process_tags_append_semantic(self): + """Test processing tags with APPEND semantic.""" + from pyatlan.client.common.asset import Save + + # Create an asset with tags having APPEND semantic + asset = Table() + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND + ), + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.APPEND + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Tags with APPEND semantic should be in add_or_update_classifications + assert processed_asset.add_or_update_classifications is not None + assert len(processed_asset.add_or_update_classifications) == 2 + assert processed_asset.atlan_tags is None + assert processed_asset.remove_classifications is None + + def test_process_tags_remove_semantic(self): + """Test processing tags with REMOVE semantic.""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REMOVE + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Tags with REMOVE semantic should be in remove_classifications + assert processed_asset.remove_classifications is not None + assert len(processed_asset.remove_classifications) == 1 + assert processed_asset.atlan_tags is None + assert processed_asset.add_or_update_classifications is None + + def test_process_tags_replace_semantic(self): + """Test processing tags with REPLACE semantic.""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Tags with REPLACE or None semantic should stay in atlan_tags + assert processed_asset.atlan_tags is not None + assert len(processed_asset.atlan_tags) == 1 + assert processed_asset.add_or_update_classifications is None + assert processed_asset.remove_classifications is None + + def test_process_tags_mixed_semantics(self): + """Test processing tags with mixed semantics.""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("AppendTag"), semantic=SaveSemantic.APPEND + ), + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("RemoveTag"), semantic=SaveSemantic.REMOVE + ), + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("ReplaceTag"), semantic=SaveSemantic.REPLACE + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Each semantic should be in its own list + assert processed_asset.add_or_update_classifications is not None + assert len(processed_asset.add_or_update_classifications) == 1 + assert ( + str(processed_asset.add_or_update_classifications[0].type_name) + == "AppendTag" + ) + + assert processed_asset.remove_classifications is not None + assert len(processed_asset.remove_classifications) == 1 + assert str(processed_asset.remove_classifications[0].type_name) == "RemoveTag" + + assert processed_asset.atlan_tags is not None + assert len(processed_asset.atlan_tags) == 1 + assert str(processed_asset.atlan_tags[0].type_name) == "ReplaceTag" + + def test_process_tags_none_semantic(self): + """Test processing tags with None semantic (backward compatibility).""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=None + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Tags with None semantic should be treated as REPLACE + assert processed_asset.atlan_tags is not None + assert len(processed_asset.atlan_tags) == 1 + assert processed_asset.add_or_update_classifications is None + assert processed_asset.remove_classifications is None + + def test_process_tags_from_add_or_update_classifications(self): + """Test processing tags from add_or_update_classifications field.""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.add_or_update_classifications = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND + ), + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Should stay in add_or_update_classifications + assert processed_asset.add_or_update_classifications is not None + assert len(processed_asset.add_or_update_classifications) == 1 + + def test_process_tags_from_remove_classifications(self): + """Test processing tags from remove_classifications field.""" + from pyatlan.client.common.asset import Save + + asset = Table() + asset.remove_classifications = [ + AtlanTag(type_name=AtlanTagName("Tag1")), # type: ignore[call-arg] + ] + + processed_asset = Save._process_tags_by_semantic(asset) + + # Should stay in remove_classifications + assert processed_asset.remove_classifications is not None + assert len(processed_asset.remove_classifications) == 1 + + def test_process_tags_no_tags(self): + """Test processing asset with no tags.""" + from pyatlan.client.common.asset import Save + + asset = Table() + processed_asset = Save._process_tags_by_semantic(asset) + + # All tag fields should be None + assert processed_asset.atlan_tags is None + assert processed_asset.add_or_update_classifications is None + assert processed_asset.remove_classifications is None + + @patch("pyatlan.client.common.asset.Save.validate_and_flush_entities") + def test_save_prepare_request_processes_tag_semantics( + self, mock_validate, client, mock_tag_cache + ): + """Test that Save.prepare_request processes tag semantics.""" + from pyatlan.client.common.asset import Save + + # Create a simple asset with required fields + asset = Table() + asset.qualified_name = "default/snowflake/db/schema/table" + asset.name = "test_table" + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.APPEND + ), + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag2"), semantic=SaveSemantic.REMOVE + ), + ] + + query_params, bulk_request = Save.prepare_request(entity=asset, client=client) + + # Check that tags were processed + processed_asset = bulk_request.entities[0] + assert processed_asset.add_or_update_classifications is not None + assert len(processed_asset.add_or_update_classifications) == 1 + assert processed_asset.remove_classifications is not None + assert len(processed_asset.remove_classifications) == 1 + assert processed_asset.atlan_tags is None + + # Check query params - should use appendTags for APPEND/REMOVE semantics + assert query_params.get("appendTags") is True + assert ( + "replaceClassifications" not in query_params + or query_params.get("replaceClassifications") is False + ) + + @patch("pyatlan.client.common.asset.Save.validate_and_flush_entities") + def test_save_prepare_request_replace_semantic_query_params( + self, mock_validate, client, mock_tag_cache + ): + """ + Test that Save.prepare_request sets correct query params + for REPLACE semantic. + """ + from pyatlan.client.common.asset import Save + + asset = Table() + asset.qualified_name = "default/snowflake/db/schema/table" + asset.name = "test_table" + asset.atlan_tags = [ + AtlanTag( # type: ignore[call-arg] + type_name=AtlanTagName("Tag1"), semantic=SaveSemantic.REPLACE + ), + ] + + query_params, bulk_request = Save.prepare_request(entity=asset, client=client) + + # Check that tags were processed + processed_asset = bulk_request.entities[0] + assert processed_asset.atlan_tags is not None + assert len(processed_asset.atlan_tags) == 1 + + # Check query params - should use replaceClassifications + # for REPLACE semantic + assert query_params.get("replaceClassifications") is True + assert query_params.get("appendTags") is False + + @patch("pyatlan.client.common.asset.Save.validate_and_flush_entities") + def test_save_prepare_request_replace_atlan_tags_param( + self, mock_validate, client, mock_tag_cache + ): + """ + Test that Save.prepare_request respects + replace_atlan_tags parameter. + """ + from pyatlan.client.common.asset import Save + + asset = Table() + asset.qualified_name = "default/snowflake/db/schema/table" + asset.name = "test_table" + # Asset without semantic tags + + query_params, bulk_request = Save.prepare_request( + entity=asset, client=client, replace_atlan_tags=True + ) + + # Check query params - should use replaceClassifications + # when replace_atlan_tags=True + assert query_params.get("replaceClassifications") is True + assert query_params.get("appendTags") is False + + @patch("pyatlan.client.common.asset.Save.validate_and_flush_entities") + def test_save_prepare_request_append_atlan_tags_param( + self, mock_validate, client, mock_tag_cache + ): + """ + Test that Save.prepare_request respects + append_atlan_tags parameter. + """ + from pyatlan.client.common.asset import Save + + asset = Table() + asset.qualified_name = "default/snowflake/db/schema/table" + asset.name = "test_table" + # Asset without semantic tags + + query_params, bulk_request = Save.prepare_request( + entity=asset, client=client, append_atlan_tags=True + ) + + # Check query params - should use appendTags when append_atlan_tags=True + assert query_params.get("appendTags") is True + assert ( + "replaceClassifications" not in query_params + or query_params.get("replaceClassifications") is False + )