Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,5 @@ pyrightconfig.json
.ionide

# End of https://www.toptal.com/developers/gitignore/api/python,visualstudiocode

.claude/
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Deploy Memgraph using methods that suit your environment, whether it's container

### Import
- [PySpark integration (CSV -> PySpark -> Memgraph)](./pyspark_integration/)
- [Importing data from Apache Iceberg (PyIceberg)](./import/iceberg/)
- [Importing data from Arrow Flight](./import/migrate/arrow-flight/)
- [Importing data with DuckDB](./import/migrate/duckdb/)
- [Importing data from Amazon Aurora/MySQL](./import/migrate/amazon_aurora/)
Expand Down
2 changes: 2 additions & 0 deletions import/iceberg/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
warehouse/
.venv/
162 changes: 162 additions & 0 deletions import/iceberg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Loading Apache Iceberg into Memgraph

Ingest data from [Apache Iceberg](https://iceberg.apache.org/) tables into Memgraph as a property graph.

- **Edition**: Community
- **Tested on**: Memgraph **v3.9**
- **Source backends**: PyIceberg, DuckDB (more coming: LOAD CSV/Parquet)

## What this example does

1. [`generate_iceberg.py`](./generate_iceberg.py) generates **1,000,000 users** and **5,000,000 transactions** with numpy and writes them to a local Iceberg warehouse via a PyIceberg `SqlCatalog` (SQLite metadata, local-filesystem warehouse). Zero infra. Sizes are configurable via `--users` / `--transactions`.
2. [`iceberg_to_memgraph.py`](./iceberg_to_memgraph.py) reads those Iceberg tables and writes them to Memgraph as `(:User)-[:SENT]->(:User)` using batched `UNWIND` queries via [gqlalchemy](https://github.com/memgraph/gqlalchemy) (Memgraph's Python client, built on `pymgclient`).

The reported elapsed time covers only the source → Memgraph ingestion (schema setup and graph reset are excluded), so the number reflects actual data movement.

## Why a local Iceberg catalog?

In production, Iceberg tables live in a data lake (S3 + AWS Glue / REST / Hive catalog). The local SQL catalog used here is a reproducible stand-in so the example runs end-to-end on a fresh clone — only the catalog config block in [`loaders/pyiceberg_loader.py`](./loaders/pyiceberg_loader.py) changes when you point at a real lake.

## Why gqlalchemy and not the official `neo4j` driver?

**Use [gqlalchemy](https://github.com/memgraph/gqlalchemy) when writing to Memgraph.** In our tests on this workload (1M users + 5M transactions, batched UNWIND), gqlalchemy delivered **at least a 2× ingestion throughput improvement** over the `neo4j` Python driver at the same `--workers` and `--batch-size`.

Why: gqlalchemy talks to Memgraph through [`pymgclient`](https://github.com/memgraph/pymgclient), a native C client built specifically for Memgraph's Bolt implementation. The `neo4j` driver is generic Bolt and adds per-call overhead (transactional bookmarking, server-routing checks, result-buffering) that bulk loads pay for on every batch but don't benefit from. For write-heavy ingestion via `UNWIND`, that overhead dominates.

Rule of thumb for this repo:
- **Writing to Memgraph** (ingest, ETL, bulk load): use gqlalchemy.
- **Read-mostly app code** where you want OGM ergonomics (Python classes ↔ nodes, query builder): also gqlalchemy.
- **Cross-vendor code that must run against Neo4j too**: use the `neo4j` driver.

## Run

### 1. Start Memgraph

```bash
docker compose up -d
```

Starts Memgraph 3.9 (Community) on `bolt://localhost:7687`. To inspect results visually, also run [Memgraph Lab](https://memgraph.com/docs/data-visualization).

### 2. Install dependencies

This example uses [uv](https://docs.astral.sh/uv/). Install it once if you don't have it (`curl -LsSf https://astral.sh/uv/install.sh | sh`), then sync the project env:

```bash
uv sync
```

### 3. Generate the Iceberg warehouse (once)

```bash
uv run python generate_iceberg.py
```

Expected output:

```
Generating 1,000,000 users...
Generating 5,000,000 transactions...
Writing Iceberg tables...

users: 1,000,000 rows
transactions: 5,000,000 rows
warehouse: .../import/iceberg/warehouse
```

Generation takes ~10–20s and produces a few hundred MB of Parquet under `warehouse/`. Re-running drops and recreates the tables.

For a quick smoke test, scale down: `uv run python generate_iceberg.py --users 10000 --transactions 50000`.

### 4. Load Iceberg into Memgraph

Pick a source backend; optionally tune parallelism and batch size:

```bash
uv run python iceberg_to_memgraph.py --source pyiceberg # defaults
uv run python iceberg_to_memgraph.py --source duckdb --workers 8 # 8 parallel writers
uv run python iceberg_to_memgraph.py --source pyiceberg --workers 8 --batch-size 50000
```

Expected output:

```
[pyiceberg, workers=1, batch=10000, user-write=create] Ingested into Memgraph in 142.37s
[duckdb, workers=8, batch=10000, user-write=create] Ingested into Memgraph in 31.04s
[pyiceberg, workers=20, batch=25000, user-write=create] Ingested into Memgraph in 24.34s
```

Flags:

- `--workers N` — N parallel Bolt sessions writing to Memgraph. A single reader thread fills a bounded queue; N writer threads drain it. Memgraph is in analytical mode for the duration (set by `prepare_graph`), so concurrent writes don't conflict.
- `--batch-size N` — rows per UNWIND batch / Bolt round trip. Default 10000. Bigger batches amortize round-trip overhead but raise per-call memory; smaller batches expose more concurrency to workers but spend more time in network framing.

Tuning order: get `--batch-size` into the right ballpark first (1k–50k is typical), then crank `--workers` up to roughly the CPU count of the server. Comparing `pyiceberg` vs `duckdb` at equal `--workers` and `--batch-size` isolates the scan engine difference; the writer side is identical.

Exact time depends on hardware. The default sizes (1M nodes + 5M edges) typically ingest in 2–5 minutes single-threaded and noticeably faster with `--workers 4..16`.

## Verify

Connect via Memgraph Lab or `mgconsole` and run:

```cypher
MATCH (u:User)-[t:SENT]->(v:User)
RETURN u.name AS sender, v.name AS receiver, t.amount, t.ts
ORDER BY t.amount DESC
LIMIT 10;
```

You should see the top 10 transactions with sender/receiver names. To verify counts:

```cypher
MATCH (u:User) RETURN count(u); // 1,000,000
MATCH ()-[t:SENT]->() RETURN count(t); // 5,000,000
```

## Architecture

```
generate_iceberg.py ──► warehouse/ (local Iceberg, 1M users + 5M txs)
loaders/{pyiceberg,duckdb}_loader.py
iceberg_to_memgraph.py ──► Memgraph (Bolt)
```

[`iceberg_to_memgraph.py`](./iceberg_to_memgraph.py) is backend-agnostic — it talks to the `Loader` interface defined in [`loaders/base.py`](./loaders/base.py).

### Adding a new source backend

1. Implement `loaders/<your>.py` with `users()` and `transactions()` returning iterators of dict batches.
2. Register it in [`loaders/__init__.py`](./loaders/__init__.py) under `LOADERS`.
3. Run `python iceberg_to_memgraph.py --source <your>`.

### Pointing at a real Iceberg lake

Replace the `SqlCatalog(...)` block in [`loaders/pyiceberg_loader.py`](./loaders/pyiceberg_loader.py) with a configured catalog, e.g.:

```python
from pyiceberg.catalog import load_catalog

self.catalog = load_catalog("prod", **{
"type": "rest",
"uri": "https://catalog.example.com",
# ...credentials, warehouse, etc.
})
```

The rest of the loader and the entire writer stay the same.

## Files

| File | Purpose |
| --- | --- |
| [`generate_iceberg.py`](./generate_iceberg.py) | One-time: generate 1M users + 5M txs into local Iceberg tables |
| [`loaders/base.py`](./loaders/base.py) | Abstract `Loader` interface |
| [`loaders/pyiceberg_loader.py`](./loaders/pyiceberg_loader.py) | PyIceberg implementation |
| [`loaders/duckdb_loader.py`](./loaders/duckdb_loader.py) | DuckDB `iceberg_scan` implementation |
| [`iceberg_to_memgraph.py`](./iceberg_to_memgraph.py) | Backend-agnostic CLI loader |
| [`docker-compose.yml`](./docker-compose.yml) | Memgraph 3.9 (Community) |
| `warehouse/` | Generated by `generate_iceberg.py`, gitignored |
13 changes: 13 additions & 0 deletions import/iceberg/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
services:
memgraph:
image: memgraph/memgraph-mage:3.9.0
container_name: iceberg-memgraph
ports:
- "7687:7687"
- "7444:7444"
command:
- "--log-level=TRACE"
- "--also-log-to-stderr=true"
- "--query-execution-timeout-sec=0"
- "--storage-snapshot-interval-sec=0"
- "--storage-wal-enabled=false"
181 changes: 181 additions & 0 deletions import/iceberg/generate_iceberg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""Generate users + transactions and write them to a local Iceberg warehouse.

Defaults: 1,000,000 users and 5,000,000 transactions.

Generation is vectorized with numpy, so producing 1M+5M rows takes only a
few seconds. Random seeds are fixed, so runs are deterministic.

In production your Iceberg tables already live in your data lake (S3 + Glue
or REST catalog). This script just creates a reproducible stand-in so the
example runs end-to-end on a fresh clone — only the catalog config block
below changes when you point at a real lake.
"""
import argparse
from pathlib import Path

import numpy as np
import pyarrow as pa
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError
from pyiceberg.io.pyarrow import schema_to_pyarrow
from pyiceberg.schema import Schema
from pyiceberg.types import (
DoubleType,
LongType,
NestedField,
StringType,
TimestampType,
)

ROOT = Path(__file__).resolve().parent
WAREHOUSE = ROOT / "warehouse"
NAMESPACE = "default"

DEFAULT_USERS = 1_000_000
DEFAULT_TXS = 5_000_000

COUNTRIES = ["US", "UK", "DE", "FR", "JP", "ES", "IT", "BR", "IN", "CA"]

USERS_SCHEMA = Schema(
NestedField(1, "user_id", LongType(), required=True),
NestedField(2, "name", StringType()),
NestedField(3, "email", StringType()),
NestedField(4, "country", StringType()),
)

TX_SCHEMA = Schema(
NestedField(1, "tx_id", LongType(), required=True),
NestedField(2, "from_user", LongType(), required=True),
NestedField(3, "to_user", LongType(), required=True),
NestedField(4, "amount", DoubleType()),
NestedField(5, "timestamp", TimestampType()),
)


def generate_users(n: int, seed: int = 42) -> pa.Table:
rng = np.random.default_rng(seed)
user_ids = np.arange(1, n + 1, dtype=np.int64)
countries = np.array(COUNTRIES)[rng.integers(0, len(COUNTRIES), size=n)]

# Synthetic but unique per user. List comp on 1M rows takes ~1s.
names = [f"User_{i:07d}" for i in user_ids]
emails = [f"user{i}@example.com" for i in user_ids]

# Derive the PyArrow schema from the Iceberg schema so column nullability
# matches `required=True` declarations — otherwise iceberg_table.append()
# rejects the table with a "required vs optional" mismatch.
return pa.table(
{
"user_id": user_ids,
"name": names,
"email": emails,
"country": countries,
},
schema=schema_to_pyarrow(USERS_SCHEMA),
)


def generate_transactions(n_txs: int, n_users: int, seed: int = 43) -> pa.Table:
rng = np.random.default_rng(seed)
tx_ids = np.arange(1, n_txs + 1, dtype=np.int64)
from_user = rng.integers(1, n_users + 1, size=n_txs, dtype=np.int64)
to_user = rng.integers(1, n_users + 1, size=n_txs, dtype=np.int64)

# Avoid self-loops: bump any equal target by 1 (wrapping at n_users).
eq = from_user == to_user
to_user[eq] = (to_user[eq] % n_users) + 1

amount = rng.uniform(1.0, 10000.0, size=n_txs).round(2)

# Spread timestamps uniformly across one year (microsecond precision).
base = np.datetime64("2025-01-01T00:00:00", "us")
seconds_in_year = 365 * 24 * 60 * 60
offsets_us = (
rng.integers(0, seconds_in_year, size=n_txs) * 1_000_000
).astype("timedelta64[us]")
timestamps = base + offsets_us

return pa.table(
{
"tx_id": tx_ids,
"from_user": from_user,
"to_user": to_user,
"amount": amount,
"timestamp": timestamps,
},
schema=schema_to_pyarrow(TX_SCHEMA),
)


def get_catalog() -> SqlCatalog:
WAREHOUSE.mkdir(parents=True, exist_ok=True)
return SqlCatalog(
"default",
**{
"uri": f"sqlite:///{WAREHOUSE}/catalog.db",
"warehouse": f"file://{WAREHOUSE}",
},
)


def ensure_namespace(catalog: SqlCatalog, namespace: str) -> None:
try:
catalog.load_namespace_properties(namespace)
except NoSuchNamespaceError:
catalog.create_namespace(namespace)


def reset_table(catalog: SqlCatalog, name: str, schema: Schema):
full = f"{NAMESPACE}.{name}"
try:
catalog.drop_table(full)
except NoSuchTableError:
pass
return catalog.create_table(full, schema=schema)


def write_iceberg(
catalog: SqlCatalog, name: str, schema: Schema, arrow_table: pa.Table
):
iceberg_table = reset_table(catalog, name, schema)
iceberg_table.append(arrow_table)
return iceberg_table


def main() -> None:
parser = argparse.ArgumentParser(
description="Generate synthetic users + transactions into local Iceberg tables."
)
parser.add_argument(
"--users",
type=int,
default=DEFAULT_USERS,
help=f"Number of users to generate (default: {DEFAULT_USERS:,})",
)
parser.add_argument(
"--transactions",
type=int,
default=DEFAULT_TXS,
help=f"Number of transactions to generate (default: {DEFAULT_TXS:,})",
)
args = parser.parse_args()

catalog = get_catalog()
ensure_namespace(catalog, NAMESPACE)

print(f"Generating {args.users:,} users...")
users_table = generate_users(args.users)
print(f"Generating {args.transactions:,} transactions...")
txs_table = generate_transactions(args.transactions, args.users)

print("Writing Iceberg tables...")
write_iceberg(catalog, "users", USERS_SCHEMA, users_table)
write_iceberg(catalog, "transactions", TX_SCHEMA, txs_table)

print(f"\nusers: {args.users:,} rows")
print(f"transactions: {args.transactions:,} rows")
print(f"warehouse: {WAREHOUSE}")


if __name__ == "__main__":
main()
Loading