diff --git a/README.md b/README.md index 28b6e7f..c9f3794 100644 --- a/README.md +++ b/README.md @@ -224,6 +224,57 @@ for dataset in dataset_iter: ``` +--- + +## Event Log + +Ingestify ships a built-in event log that lets a separate service (or cron job) react to dataset lifecycle events — without polling the database or coupling services together. + +### How it works + +``` +Ingestify ingestion run + └── EventLogSubscriber writes to event_log table (same DB) + +Consumer process (your service / cron) + └── EventLogConsumer reads new rows, calls your callback, advances cursor +``` + +The cursor is per-reader — multiple independent consumers can each track their own position. + +### Enable the subscriber + +Add one line to `config.yaml`: + +```yaml +event_subscribers: + - type: ingestify.infra.event_log.EventLogSubscriber +``` + +That's it. The `event_log` and `reader_state` tables are created automatically in the same database as the rest of ingestify. + +### Write a consumer + +```python +# run_consumer.py +from ingestify.infra.event_log import EventLogConsumer + +def on_event(event_type: str, payload: dict) -> None: + if event_type == "revision_added": + dataset_id = payload["dataset_id"] + # trigger your downstream logic here + +# Run once (e.g. from a cron job): +EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event) + +# Or keep running, polling every 5 seconds: +EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5) +``` + +`from_config` reads `metadata_url` from your existing `config.yaml` — no duplicate connection strings. + +`run()` returns `0` on success and `1` if a processing error occurred. On error the cursor is **not** advanced, so the failing event will be retried on the next run. + --- ## Roadmap diff --git a/docs/configuration.md b/docs/configuration.md index 43697cd..0a7e637 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -155,7 +155,7 @@ ingestion_plans: ## Event Subscribers Section (Optional) -The `event_subscribers` section defines handlers for processing data after ingestion: +The `event_subscribers` section defines handlers that are called after each dataset lifecycle event (`dataset_created`, `revision_added`, `metadata_updated`). ```yaml event_subscribers: @@ -166,6 +166,44 @@ event_subscribers: - `type`: Full import path to the event subscriber class +### Built-in: EventLogSubscriber + +Ingestify ships a ready-made subscriber that persists every event to an `event_log` table in the **same database** as the rest of the metadata. This makes it easy to build consumers that react to changes without polling the dataset table. + +```yaml +event_subscribers: + - type: ingestify.infra.event_log.EventLogSubscriber +``` + +Two tables are created automatically (respecting the configured `table_prefix`): + +| Table | Purpose | +|---|---| +| `event_log` | One row per domain event, with `event_type`, JSON payload, `source`, and `dataset_id` | +| `reader_state` | One row per named consumer, tracking the last processed event id | + +### Consuming events + +Write a small script (run as a cron job or long-running process) that reads from the event log: + +```python +from ingestify.infra.event_log import EventLogConsumer + +def on_event(event_type: str, payload: dict) -> None: + if event_type == "revision_added": + trigger_downstream(payload["dataset_id"]) + +# Run once (cron-friendly, exits 0 on success or 1 on error): +EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event) + +# Keep running, poll every 5 seconds: +EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5) +``` + +`reader_name` is an arbitrary string that scopes the cursor — use a different name for each independent consumer so they track their own position. + +`from_config` reads `metadata_url` (and `table_prefix` if set) directly from your existing config file, so there is no duplication of connection strings. + ## Environment Variables and Secrets Ingestify supports environment variable substitution with the `!ENV` YAML tag: diff --git a/ingestify/infra/event_log/__init__.py b/ingestify/infra/event_log/__init__.py new file mode 100644 index 0000000..76b9acc --- /dev/null +++ b/ingestify/infra/event_log/__init__.py @@ -0,0 +1,4 @@ +from .consumer import EventLogConsumer +from .subscriber import EventLogSubscriber + +__all__ = ["EventLogConsumer", "EventLogSubscriber"] diff --git a/ingestify/infra/event_log/consumer.py b/ingestify/infra/event_log/consumer.py new file mode 100644 index 0000000..b7d54ec --- /dev/null +++ b/ingestify/infra/event_log/consumer.py @@ -0,0 +1,132 @@ +import json +import logging +import time +from typing import Callable, Optional + +from sqlalchemy import create_engine, select + +from .tables import get_tables + +logger = logging.getLogger(__name__) + + +class EventLogConsumer: + """Cursor-based consumer for the event_log table. + + Usage (run once, e.g. cron): + EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event) + + Usage (keep running, poll every 5 seconds): + EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event, poll_interval=5) + + Exit codes (returned by run): + 0 Batch processed successfully (or nothing new). + 1 A processing error occurred; cursor was NOT advanced. + """ + + def __init__(self, database_url: str, reader_name: str, table_prefix: str = ""): + self._reader_name = reader_name + tables = get_tables(table_prefix) + self._metadata = tables["metadata"] + self._event_log_table = tables["event_log_table"] + self._reader_state_table = tables["reader_state_table"] + self._engine = create_engine(database_url) + self._metadata.create_all(self._engine, checkfirst=True) + + @classmethod + def from_config(cls, config_file: str, reader_name: str) -> "EventLogConsumer": + from pyaml_env import parse_config + + config = parse_config(config_file, default_value="") + main = config["main"] + database_url = main["metadata_url"] + table_prefix = main.get("metadata_options", {}).get("table_prefix", "") + return cls( + database_url=database_url, + reader_name=reader_name, + table_prefix=table_prefix, + ) + + def _ensure_reader_state(self, conn) -> None: + exists = conn.execute( + select(self._reader_state_table.c.reader_name).where( + self._reader_state_table.c.reader_name == self._reader_name + ) + ).fetchone() + if not exists: + conn.execute( + self._reader_state_table.insert().values( + reader_name=self._reader_name, + last_event_id=0, + ) + ) + conn.commit() + + def _get_last_event_id(self, conn) -> int: + row = conn.execute( + select(self._reader_state_table.c.last_event_id).where( + self._reader_state_table.c.reader_name == self._reader_name + ) + ).fetchone() + return row[0] if row else 0 + + def _fetch_batch(self, conn, last_event_id: int, batch_size: int) -> list: + return conn.execute( + select( + self._event_log_table.c.id, + self._event_log_table.c.event_type, + self._event_log_table.c.payload_json, + ) + .where(self._event_log_table.c.id > last_event_id) + .order_by(self._event_log_table.c.id) + .limit(batch_size) + ).fetchall() + + def _update_cursor(self, conn, event_id: int) -> None: + conn.execute( + self._reader_state_table.update() + .where(self._reader_state_table.c.reader_name == self._reader_name) + .values(last_event_id=event_id) + ) + conn.commit() + + def _run_once(self, on_event: Callable, batch_size: int = 100) -> int: + with self._engine.connect() as conn: + self._ensure_reader_state(conn) + last_id = self._get_last_event_id(conn) + rows = self._fetch_batch(conn, last_id, batch_size) + + if not rows: + return 0 + + for event_id, event_type, payload_json in rows: + try: + payload = ( + payload_json + if isinstance(payload_json, dict) + else json.loads(payload_json) + ) + on_event(event_type, payload) + except Exception: + logger.exception( + "Failed to process event id=%d type=%r — cursor NOT advanced", + event_id, + event_type, + ) + return 1 + + self._update_cursor(conn, event_id) + + return 0 + + def run( + self, + on_event: Callable, + poll_interval: Optional[int] = None, + batch_size: int = 100, + ) -> int: + while True: + exit_code = self._run_once(on_event, batch_size) + if exit_code != 0 or poll_interval is None: + return exit_code + time.sleep(poll_interval) diff --git a/ingestify/infra/event_log/subscriber.py b/ingestify/infra/event_log/subscriber.py new file mode 100644 index 0000000..680462f --- /dev/null +++ b/ingestify/infra/event_log/subscriber.py @@ -0,0 +1,58 @@ +import logging + +from ingestify.domain.models.event import Subscriber +from ingestify.utils import utcnow + +from .tables import get_tables + +logger = logging.getLogger(__name__) + + +class EventLogSubscriber(Subscriber): + """Persists every Ingestify dataset event to the event_log table. + + Uses the same database as the dataset store — no extra configuration needed. + + Register in ingestify.yaml: + event_subscribers: + - type: ingestify.infra.event_log.EventLogSubscriber + """ + + def __init__(self, store): + super().__init__(store) + session_provider = store.dataset_repository.session_provider + tables = get_tables(session_provider.table_prefix) + tables["metadata"].create_all(session_provider.engine, checkfirst=True) + self._engine = session_provider.engine + self._event_log_table = tables["event_log_table"] + + def _write(self, event_type: str, dataset) -> None: + try: + with self._engine.connect() as conn: + conn.execute( + self._event_log_table.insert().values( + event_type=event_type, + payload_json=dataset.model_dump( + mode="json", exclude={"revisions"} + ), + source=dataset.provider, + dataset_id=dataset.dataset_id, + created_at=utcnow(), + ) + ) + conn.commit() + except Exception: + logger.exception( + "EventLogSubscriber: failed to write event_type=%r dataset_id=%r", + event_type, + dataset.dataset_id, + ) + + def on_dataset_created(self, event) -> None: + self._write(type(event).event_type, event.dataset) + + def on_metadata_updated(self, event) -> None: + self._write(type(event).event_type, event.dataset) + + def on_revision_added(self, event) -> None: + self._write(type(event).event_type, event.dataset) diff --git a/ingestify/infra/event_log/tables.py b/ingestify/infra/event_log/tables.py new file mode 100644 index 0000000..dff3b5f --- /dev/null +++ b/ingestify/infra/event_log/tables.py @@ -0,0 +1,38 @@ +from sqlalchemy import BigInteger, Column, Integer, JSON, MetaData, String, Table + +from ingestify.infra.store.dataset.sqlalchemy.tables import TZDateTime + + +def get_tables(table_prefix: str = ""): + metadata = MetaData() + + event_log_table = Table( + f"{table_prefix}event_log", + metadata, + Column( + "id", + Integer() + .with_variant(BigInteger(), "postgresql") + .with_variant(BigInteger(), "mysql"), + primary_key=True, + autoincrement=True, + ), + Column("event_type", String(255), nullable=False), + Column("payload_json", JSON, nullable=False), + Column("source", String(255)), + Column("dataset_id", String(255)), + Column("created_at", TZDateTime(6)), + ) + + reader_state_table = Table( + f"{table_prefix}reader_state", + metadata, + Column("reader_name", String(255), primary_key=True), + Column("last_event_id", BigInteger, nullable=False), + ) + + return { + "metadata": metadata, + "event_log_table": event_log_table, + "reader_state_table": reader_state_table, + } diff --git a/ingestify/tests/test_event_log.py b/ingestify/tests/test_event_log.py new file mode 100644 index 0000000..cbd4b34 --- /dev/null +++ b/ingestify/tests/test_event_log.py @@ -0,0 +1,179 @@ +from unittest.mock import MagicMock + +import pytest +from sqlalchemy import create_engine + +from ingestify.infra.event_log.consumer import EventLogConsumer +from ingestify.infra.event_log.subscriber import EventLogSubscriber +from ingestify.infra.event_log.tables import get_tables +from ingestify.utils import utcnow + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def make_consumer() -> EventLogConsumer: + consumer = EventLogConsumer("sqlite:///:memory:", reader_name="test") + return consumer + + +def insert_events(consumer: EventLogConsumer, *events): + """Insert (event_type, payload) tuples directly into the event_log table.""" + with consumer._engine.connect() as conn: + for event_type, payload in events: + conn.execute( + consumer._event_log_table.insert().values( + event_type=event_type, + payload_json=payload, + created_at=utcnow(), + ) + ) + conn.commit() + + +def make_subscriber() -> EventLogSubscriber: + engine = create_engine("sqlite:///:memory:") + mock_store = MagicMock() + mock_store.dataset_repository.session_provider.table_prefix = "" + mock_store.dataset_repository.session_provider.engine = engine + return EventLogSubscriber(mock_store) + + +# --------------------------------------------------------------------------- +# Consumer tests +# --------------------------------------------------------------------------- + + +def test_processes_events_in_order(): + consumer = make_consumer() + insert_events( + consumer, + ("dataset_created", {"dataset_id": "a"}), + ("revision_added", {"dataset_id": "b"}), + ("revision_added", {"dataset_id": "c"}), + ) + + processed = [] + consumer._run_once(lambda et, p: processed.append(p["dataset_id"])) + + assert processed == ["a", "b", "c"] + + +def test_cursor_advanced_after_each_event(): + consumer = make_consumer() + insert_events( + consumer, + ("dataset_created", {"dataset_id": "a"}), + ("revision_added", {"dataset_id": "b"}), + ) + + cursors = [] + original = consumer._update_cursor + + def capture(conn, event_id): + cursors.append(event_id) + original(conn, event_id) + + consumer._update_cursor = capture + consumer._run_once(lambda et, p: None) + + assert len(cursors) == 2 + assert cursors[0] < cursors[1] + + +def test_cursor_not_advanced_on_error(): + consumer = make_consumer() + insert_events(consumer, ("dataset_created", {"dataset_id": "a"})) + + cursors = [] + original = consumer._update_cursor + + def capture(conn, event_id): + cursors.append(event_id) + original(conn, event_id) + + consumer._update_cursor = capture + exit_code = consumer._run_once( + lambda et, p: (_ for _ in ()).throw(RuntimeError("boom")) + ) + + assert exit_code == 1 + assert cursors == [] + + +def test_no_events_returns_zero(): + consumer = make_consumer() + exit_code = consumer._run_once(lambda et, p: None) + assert exit_code == 0 + + +def test_only_new_events_processed_after_cursor(): + consumer = make_consumer() + insert_events( + consumer, + ("dataset_created", {"dataset_id": "a"}), + ("revision_added", {"dataset_id": "b"}), + ) + + # consume first batch + consumer._run_once(lambda et, p: None) + + # insert a new event + insert_events(consumer, ("revision_added", {"dataset_id": "c"})) + + processed = [] + consumer._run_once(lambda et, p: processed.append(p["dataset_id"])) + + assert processed == ["c"] + + +# --------------------------------------------------------------------------- +# Subscriber tests +# --------------------------------------------------------------------------- + + +def make_dataset(dataset_id="ds1", provider="test"): + dataset = MagicMock() + dataset.dataset_id = dataset_id + dataset.provider = provider + dataset.model_dump.return_value = {"dataset_id": dataset_id, "provider": provider} + return dataset + + +def make_event(event_type, dataset): + event = MagicMock() + type(event).event_type = event_type + event.dataset = dataset + return event + + +def test_subscriber_writes_event(): + subscriber = make_subscriber() + subscriber.on_dataset_created(make_event("dataset_created", make_dataset())) + + with subscriber._engine.connect() as conn: + rows = conn.execute(subscriber._event_log_table.select()).fetchall() + + assert len(rows) == 1 + assert rows[0].event_type == "dataset_created" + assert rows[0].dataset_id == "ds1" + + +def test_subscriber_writes_all_event_types(): + subscriber = make_subscriber() + dataset = make_dataset() + + subscriber.on_dataset_created(make_event("dataset_created", dataset)) + subscriber.on_revision_added(make_event("revision_added", dataset)) + subscriber.on_metadata_updated(make_event("metadata_updated", dataset)) + + with subscriber._engine.connect() as conn: + rows = conn.execute(subscriber._event_log_table.select()).fetchall() + + assert [r.event_type for r in rows] == [ + "dataset_created", + "revision_added", + "metadata_updated", + ]