diff --git a/src/apify/_actor.py b/src/apify/_actor.py index fac2ea8b..5df4d6b3 100644 --- a/src/apify/_actor.py +++ b/src/apify/_actor.py @@ -46,6 +46,7 @@ from typing_extensions import Self + from crawlee._types import JsonSerializable from crawlee.proxy_configuration import _NewUrlFunction from apify._models import Webhook @@ -95,6 +96,8 @@ async def main() -> None: ``` """ + _ACTOR_STATE_KEY = 'APIFY_GLOBAL_STATE' + def __init__( self, configuration: Configuration | None = None, @@ -133,6 +136,9 @@ def __init__( self._apify_client: ApifyClientAsync | None = None + # Keep track of all used state stores to persist their values on exit + self._use_state_stores: set[str | None] = set() + self._is_initialized = False """Whether any Actor instance is currently initialized.""" @@ -243,6 +249,9 @@ async def finalize() -> None: await self.event_manager.__aexit__(None, None, None) await self._charging_manager_implementation.__aexit__(None, None, None) + # Persist Actor state + await self._save_actor_state() + await asyncio.wait_for(finalize(), self._cleanup_timeout.total_seconds()) self._is_initialized = False @@ -1324,6 +1333,31 @@ async def create_proxy_configuration( return proxy_configuration + async def use_state( + self, + default_value: dict[str, JsonSerializable] | None = None, + key: str | None = None, + kvs_name: str | None = None, + ) -> dict[str, JsonSerializable]: + """Easily create and manage state values. All state values are automatically persisted. + + Values can be modified by simply using the assignment operator. + + Args: + default_value: The default value to initialize the state if it is not already set. + key: The key in the key-value store where the state is stored. If not provided, a default key is used. + kvs_name: The name of the key-value store where the state is stored. If not provided, the default + key-value store associated with the Actor run is used. + """ + self._use_state_stores.add(kvs_name) + kvs = await self.open_key_value_store(name=kvs_name) + return await kvs.get_auto_saved_value(key or self._ACTOR_STATE_KEY, default_value) + + async def _save_actor_state(self) -> None: + for kvs_name in self._use_state_stores: + store = await self.open_key_value_store(name=kvs_name) + await store.persist_autosaved_values() + def _raise_if_not_initialized(self) -> None: if not self._is_initialized: raise RuntimeError('The Actor was not initialized!') diff --git a/tests/unit/actor/test_actor_key_value_store.py b/tests/unit/actor/test_actor_key_value_store.py index 66d4a6e7..b7de3e6b 100644 --- a/tests/unit/actor/test_actor_key_value_store.py +++ b/tests/unit/actor/test_actor_key_value_store.py @@ -1,5 +1,7 @@ from __future__ import annotations +import asyncio + import pytest from apify_shared.consts import ApifyEnvVars @@ -106,3 +108,79 @@ async def test_get_input_with_encrypted_secrets(monkeypatch: pytest.MonkeyPatch) assert actor_input['secret_string'] == secret_string assert actor_input['secret_object'] == secret_object assert actor_input['secret_array'] == secret_array + + +async def test_use_state(monkeypatch: pytest.MonkeyPatch) -> None: + # Set a short persist state interval to speed up the test + monkeypatch.setenv(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, '100') + async with Actor as actor: + state = await actor.use_state() + assert state == {} + + state['state'] = 'first_state' + + await asyncio.sleep(0.2) # Wait for the state to be persisted + + kvs = await actor.open_key_value_store() + stored_state = await kvs.get_value('CRAWLEE_STATE_0') + assert stored_state == {'state': 'first_state'} + + state['state'] = 'finished_state' + + saved_sate = await kvs.get_value('CRAWLEE_STATE_0') + assert saved_sate == {'state': 'finished_state'} + + +async def test_use_state_non_default(monkeypatch: pytest.MonkeyPatch) -> None: + # Set a short persist state interval to speed up the test + monkeypatch.setenv(ApifyEnvVars.PERSIST_STATE_INTERVAL_MILLIS, '100') + async with Actor as actor: + state = await actor.use_state( + default_value={'state': 'initial_state'}, key='custom_state_key', kvs_name='custom-kvs' + ) + assert state == {'state': 'initial_state'} + + state['state'] = 'first_state' + + await asyncio.sleep(0.2) # Wait for the state to be persisted + + kvs = await actor.open_key_value_store(name='custom-kvs') + stored_state = await kvs.get_value('custom_state_key') + assert stored_state == {'state': 'first_state'} + + state['state'] = 'finished_state' + + saved_sate = await kvs.get_value('custom_state_key') + assert saved_sate == {'state': 'finished_state'} + + +async def test_use_state_persists_on_actor_stop() -> None: + async with Actor as actor: + state = await actor.use_state() + assert state == {} + + kvs = await actor.open_key_value_store() + + state['state'] = 'finished_state' + + # After Actor context is exited, the state should be persisted + saved_sate = await kvs.get_value('CRAWLEE_STATE_0') + assert saved_sate == {'state': 'finished_state'} + + +async def test_use_state_with_multiple_stores() -> None: + async with Actor as actor: + state_default = await actor.use_state() + state_custom = await actor.use_state(kvs_name='custom-kvs') + + state_default['value'] = 'default_store' + state_custom['value'] = 'custom_store' + + kvs_default = await actor.open_key_value_store() + kvs_custom = await actor.open_key_value_store(name='custom-kvs') + + saved_state_default = await kvs_default.get_value('CRAWLEE_STATE_0') + assert saved_state_default == {'value': 'default_store'} + + saved_state_custom = await kvs_custom.get_value('CRAWLEE_STATE_0') + assert saved_state_custom == {'value': 'custom_store'}