Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,57 @@ for dataset in dataset_iter:
```


---

## Event Log

Ingestify ships a built-in event log that lets a separate service (or cron job) react to dataset lifecycle events — without polling the database or coupling services together.

### How it works

```
Ingestify ingestion run
└── EventLogSubscriber writes to event_log table (same DB)

Consumer process (your service / cron)
└── EventLogConsumer reads new rows, calls your callback, advances cursor
```

The cursor is per-reader — multiple independent consumers can each track their own position.

### Enable the subscriber

Add one line to `config.yaml`:

```yaml
event_subscribers:
- type: ingestify.infra.event_log.EventLogSubscriber
```

That's it. The `event_log` and `reader_state` tables are created automatically in the same database as the rest of ingestify.

### Write a consumer

```python
# run_consumer.py
from ingestify.infra.event_log import EventLogConsumer

def on_event(event_type: str, payload: dict) -> None:
if event_type == "revision_added":
dataset_id = payload["dataset_id"]
# trigger your downstream logic here

# Run once (e.g. from a cron job):
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event)

# Or keep running, polling every 5 seconds:
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5)
```

`from_config` reads `metadata_url` from your existing `config.yaml` — no duplicate connection strings.

`run()` returns `0` on success and `1` if a processing error occurred. On error the cursor is **not** advanced, so the failing event will be retried on the next run.

---

## Roadmap
Expand Down
40 changes: 39 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ ingestion_plans:

## Event Subscribers Section (Optional)

The `event_subscribers` section defines handlers for processing data after ingestion:
The `event_subscribers` section defines handlers that are called after each dataset lifecycle event (`dataset_created`, `revision_added`, `metadata_updated`).

```yaml
event_subscribers:
Expand All @@ -166,6 +166,44 @@ event_subscribers:

- `type`: Full import path to the event subscriber class

### Built-in: EventLogSubscriber

Ingestify ships a ready-made subscriber that persists every event to an `event_log` table in the **same database** as the rest of the metadata. This makes it easy to build consumers that react to changes without polling the dataset table.

```yaml
event_subscribers:
- type: ingestify.infra.event_log.EventLogSubscriber
```

Two tables are created automatically (respecting the configured `table_prefix`):

| Table | Purpose |
|---|---|
| `event_log` | One row per domain event, with `event_type`, JSON payload, `source`, and `dataset_id` |
| `reader_state` | One row per named consumer, tracking the last processed event id |

### Consuming events

Write a small script (run as a cron job or long-running process) that reads from the event log:

```python
from ingestify.infra.event_log import EventLogConsumer

def on_event(event_type: str, payload: dict) -> None:
if event_type == "revision_added":
trigger_downstream(payload["dataset_id"])

# Run once (cron-friendly, exits 0 on success or 1 on error):
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event)

# Keep running, poll every 5 seconds:
EventLogConsumer.from_config("config.yaml", reader_name="my-service").run(on_event, poll_interval=5)
```

`reader_name` is an arbitrary string that scopes the cursor — use a different name for each independent consumer so they track their own position.

`from_config` reads `metadata_url` (and `table_prefix` if set) directly from your existing config file, so there is no duplication of connection strings.

## Environment Variables and Secrets

Ingestify supports environment variable substitution with the `!ENV` YAML tag:
Expand Down
4 changes: 4 additions & 0 deletions ingestify/infra/event_log/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .consumer import EventLogConsumer
from .subscriber import EventLogSubscriber

__all__ = ["EventLogConsumer", "EventLogSubscriber"]
132 changes: 132 additions & 0 deletions ingestify/infra/event_log/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import json
import logging
import time
from typing import Callable, Optional

from sqlalchemy import create_engine, select

from .tables import get_tables

logger = logging.getLogger(__name__)


class EventLogConsumer:
"""Cursor-based consumer for the event_log table.

Usage (run once, e.g. cron):
EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event)

Usage (keep running, poll every 5 seconds):
EventLogConsumer.from_config("ingestify.yaml", reader_name="importer").run(on_event, poll_interval=5)

Exit codes (returned by run):
0 Batch processed successfully (or nothing new).
1 A processing error occurred; cursor was NOT advanced.
"""

def __init__(self, database_url: str, reader_name: str, table_prefix: str = ""):
self._reader_name = reader_name
tables = get_tables(table_prefix)
self._metadata = tables["metadata"]
self._event_log_table = tables["event_log_table"]
self._reader_state_table = tables["reader_state_table"]
self._engine = create_engine(database_url)
self._metadata.create_all(self._engine, checkfirst=True)

@classmethod
def from_config(cls, config_file: str, reader_name: str) -> "EventLogConsumer":
from pyaml_env import parse_config

config = parse_config(config_file, default_value="")
main = config["main"]
database_url = main["metadata_url"]
table_prefix = main.get("metadata_options", {}).get("table_prefix", "")
return cls(
database_url=database_url,
reader_name=reader_name,
table_prefix=table_prefix,
)

def _ensure_reader_state(self, conn) -> None:
exists = conn.execute(
select(self._reader_state_table.c.reader_name).where(
self._reader_state_table.c.reader_name == self._reader_name
)
).fetchone()
if not exists:
conn.execute(
self._reader_state_table.insert().values(
reader_name=self._reader_name,
last_event_id=0,
)
)
conn.commit()

def _get_last_event_id(self, conn) -> int:
row = conn.execute(
select(self._reader_state_table.c.last_event_id).where(
self._reader_state_table.c.reader_name == self._reader_name
)
).fetchone()
return row[0] if row else 0

def _fetch_batch(self, conn, last_event_id: int, batch_size: int) -> list:
return conn.execute(
select(
self._event_log_table.c.id,
self._event_log_table.c.event_type,
self._event_log_table.c.payload_json,
)
.where(self._event_log_table.c.id > last_event_id)
.order_by(self._event_log_table.c.id)
.limit(batch_size)
).fetchall()

def _update_cursor(self, conn, event_id: int) -> None:
conn.execute(
self._reader_state_table.update()
.where(self._reader_state_table.c.reader_name == self._reader_name)
.values(last_event_id=event_id)
)
conn.commit()

def _run_once(self, on_event: Callable, batch_size: int = 100) -> int:
with self._engine.connect() as conn:
self._ensure_reader_state(conn)
last_id = self._get_last_event_id(conn)
rows = self._fetch_batch(conn, last_id, batch_size)

if not rows:
return 0

for event_id, event_type, payload_json in rows:
try:
payload = (
payload_json
if isinstance(payload_json, dict)
else json.loads(payload_json)
)
on_event(event_type, payload)
except Exception:
logger.exception(
"Failed to process event id=%d type=%r — cursor NOT advanced",
event_id,
event_type,
)
return 1

self._update_cursor(conn, event_id)

return 0

def run(
self,
on_event: Callable,
poll_interval: Optional[int] = None,
batch_size: int = 100,
) -> int:
while True:
exit_code = self._run_once(on_event, batch_size)
if exit_code != 0 or poll_interval is None:
return exit_code
time.sleep(poll_interval)
58 changes: 58 additions & 0 deletions ingestify/infra/event_log/subscriber.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging

from ingestify.domain.models.event import Subscriber
from ingestify.utils import utcnow

from .tables import get_tables

logger = logging.getLogger(__name__)


class EventLogSubscriber(Subscriber):
"""Persists every Ingestify dataset event to the event_log table.

Uses the same database as the dataset store — no extra configuration needed.

Register in ingestify.yaml:
event_subscribers:
- type: ingestify.infra.event_log.EventLogSubscriber
"""

def __init__(self, store):
super().__init__(store)
session_provider = store.dataset_repository.session_provider
tables = get_tables(session_provider.table_prefix)
tables["metadata"].create_all(session_provider.engine, checkfirst=True)
self._engine = session_provider.engine
self._event_log_table = tables["event_log_table"]

def _write(self, event_type: str, dataset) -> None:
try:
with self._engine.connect() as conn:
conn.execute(
self._event_log_table.insert().values(
event_type=event_type,
payload_json=dataset.model_dump(
mode="json", exclude={"revisions"}
),
source=dataset.provider,
dataset_id=dataset.dataset_id,
created_at=utcnow(),
)
)
conn.commit()
except Exception:
logger.exception(
"EventLogSubscriber: failed to write event_type=%r dataset_id=%r",
event_type,
dataset.dataset_id,
)

def on_dataset_created(self, event) -> None:
self._write(type(event).event_type, event.dataset)

def on_metadata_updated(self, event) -> None:
self._write(type(event).event_type, event.dataset)

def on_revision_added(self, event) -> None:
self._write(type(event).event_type, event.dataset)
38 changes: 38 additions & 0 deletions ingestify/infra/event_log/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from sqlalchemy import BigInteger, Column, Integer, JSON, MetaData, String, Table

from ingestify.infra.store.dataset.sqlalchemy.tables import TZDateTime


def get_tables(table_prefix: str = ""):
metadata = MetaData()

event_log_table = Table(
f"{table_prefix}event_log",
metadata,
Column(
"id",
Integer()
.with_variant(BigInteger(), "postgresql")
.with_variant(BigInteger(), "mysql"),
primary_key=True,
autoincrement=True,
),
Column("event_type", String(255), nullable=False),
Column("payload_json", JSON, nullable=False),
Column("source", String(255)),
Column("dataset_id", String(255)),
Column("created_at", TZDateTime(6)),
)

reader_state_table = Table(
f"{table_prefix}reader_state",
metadata,
Column("reader_name", String(255), primary_key=True),
Column("last_event_id", BigInteger, nullable=False),
)

return {
"metadata": metadata,
"event_log_table": event_log_table,
"reader_state_table": reader_state_table,
}
Loading
Loading