Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ jobs:

- name: Run pre-commit checks
run: uv run pre-commit run --all-files

- name: Run unit tests
run: uv run pytest tests/unit -v --tb=short

- name: Generate coverage report
run: uv run pytest tests/unit --cov=scripts --cov-report=term-missing --cov-report=html
81 changes: 81 additions & 0 deletions examples/s1_quickstart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#!/usr/bin/env python3
"""Quick S1 GRD to GeoZarr conversion example.

Demonstrates end-to-end S1 pipeline:
1. Fetch S1 item from STAC
2. Convert to GeoZarr
3. Register in STAC catalog
4. Augment with preview links
"""

import subprocess
import sys
from pathlib import Path


def run_s1_pipeline(
stac_url: str = "https://stac.core.eopf.eodc.eu",
item_id: str = "S1C_IW_GRDH_1SDV_20251008T163126_20251008T163151_004473_008DBA_9AB4",
output_dir: Path = Path("./s1_output"),
) -> int:
"""Run S1 GRD pipeline locally."""

output_dir.mkdir(exist_ok=True)
geozarr_path = output_dir / f"{item_id}_geozarr.zarr"

print(f"🛰️ Processing S1 item: {item_id}")

# Step 1: Get source URL
print("\n1️⃣ Fetching STAC item...")
cmd = [
"python",
"scripts/get_zarr_url.py",
f"{stac_url}/collections/sentinel-1-l1-grd/items/{item_id}",
]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
source_url = result.stdout.strip()
print(f" Source: {source_url}")

# Step 2: Convert to GeoZarr
print("\n2️⃣ Converting to GeoZarr...")
cmd = [
"eopf-geozarr",
"convert",
source_url,
str(geozarr_path),
"--groups",
"/measurements",
"--gcp-group",
"/conditions/gcp",
"--spatial-chunk",
"2048",
"--verbose",
]
subprocess.run(cmd, check=True)
print(f" ✓ Created: {geozarr_path}")

# Step 3: Validate
print("\n3️⃣ Validating GeoZarr...")
cmd = ["eopf-geozarr", "validate", str(geozarr_path)]
subprocess.run(cmd, check=True)
print(" ✓ Valid GeoZarr")

print("\n✅ S1 pipeline complete!")
print(f" Output: {geozarr_path}")
print("\n Next steps:")
print(" - Upload to S3")
print(" - Register in STAC catalog")
print(" - View in titiler-eopf")

return 0


if __name__ == "__main__":
try:
sys.exit(run_s1_pipeline())
except subprocess.CalledProcessError as e:
print(f"\n❌ Pipeline failed: {e}", file=sys.stderr)
sys.exit(1)
except KeyboardInterrupt:
print("\n⚠️ Interrupted", file=sys.stderr)
sys.exit(130)
172 changes: 172 additions & 0 deletions scripts/test_s1_e2e.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
#!/bin/bash
# Test S1 GRD end-to-end pipeline in devseed-staging namespace
#
# This script:
# 1. Applies the workflow template
# 2. Publishes an S1 test payload via AMQP
# 3. Waits for workflow completion
# 4. Shows logs and verifies STAC item was created

set -euo pipefail

# Set kubeconfig
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(dirname "$SCRIPT_DIR")"
export KUBECONFIG="${KUBECONFIG:-$PROJECT_ROOT/.work/kubeconfig}"

if [ ! -f "$KUBECONFIG" ]; then
echo "❌ Kubeconfig not found at: $KUBECONFIG"
echo "Please set KUBECONFIG environment variable or create .work/kubeconfig"
exit 1
fi

NAMESPACE="${NAMESPACE:-devseed-staging}"
PAYLOAD_FILE="${PAYLOAD_FILE:-workflows/examples/payload-s1.json}"
TIMEOUT="${TIMEOUT:-600}" # 10 minutes

echo "=========================================="
echo "S1 GRD Pipeline E2E Test"
echo "=========================================="
echo "Kubeconfig: $KUBECONFIG"
echo "Namespace: $NAMESPACE"
echo "Payload: $PAYLOAD_FILE"
echo "Timeout: ${TIMEOUT}s"
echo ""

# Step 1: Apply workflow template
echo "📝 Applying workflow template..."
kubectl -n "$NAMESPACE" apply -f workflows/template.yaml
echo "✅ Template applied"
echo ""

# Step 2: Publish AMQP message
echo "📤 Publishing test payload..."
kubectl -n "$NAMESPACE" delete job amqp-publish-once --ignore-not-found=true
kubectl -n "$NAMESPACE" delete configmap amqp-payload --ignore-not-found=true
kubectl -n "$NAMESPACE" create configmap amqp-payload --from-file=body.json="$PAYLOAD_FILE"
kubectl -n "$NAMESPACE" apply -f workflows/amqp-publish-once.yaml
echo "⏳ Waiting for publish job..."
kubectl -n "$NAMESPACE" wait --for=condition=complete --timeout=120s job/amqp-publish-once
echo "✅ Payload published"
echo ""

# Step 3: Get latest workflow
echo "🔍 Finding triggered workflow..."
sleep 3 # Give sensor time to create workflow
WORKFLOW=$(kubectl -n "$NAMESPACE" get wf --sort-by=.metadata.creationTimestamp -o jsonpath='{.items[-1:].metadata.name}' 2>/dev/null || true)
if [ -z "$WORKFLOW" ]; then
echo "❌ No workflow found!"
exit 1
fi
echo "✅ Workflow: $WORKFLOW"
echo ""

# Step 4: Wait for completion
echo "⏳ Waiting for workflow completion (timeout: ${TIMEOUT}s)..."
START_TIME=$(date +%s)
while true; do
PHASE=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.status.phase}' 2>/dev/null || echo "Unknown")
ELAPSED=$(($(date +%s) - START_TIME))

echo " [${ELAPSED}s] Phase: $PHASE"

case "$PHASE" in
Succeeded)
echo "✅ Workflow succeeded!"
break
;;
Failed|Error)
echo "❌ Workflow failed!"
break
;;
Unknown)
echo "❌ Workflow disappeared!"
exit 1
;;
esac

if [ $ELAPSED -ge $TIMEOUT ]; then
echo "⏰ Timeout reached!"
break
fi

sleep 5
done
echo ""

# Step 5: Show workflow details
echo "=========================================="
echo "Workflow Details"
echo "=========================================="
kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='
Name: {.metadata.name}
Status: {.status.phase}
Started: {.status.startedAt}
Finished: {.status.finishedAt}
Duration: {.status.estimatedDuration}

Parameters:
source_url: {.spec.arguments.parameters[?(@.name=="source_url")].value}
item_id: {.spec.arguments.parameters[?(@.name=="item_id")].value}
collection: {.spec.arguments.parameters[?(@.name=="register_collection")].value}
'
echo ""
echo ""

# Step 6: Show pod logs
echo "=========================================="
echo "Pod Logs"
echo "=========================================="
PODS=$(kubectl -n "$NAMESPACE" get pods -l workflows.argoproj.io/workflow="$WORKFLOW" -o name 2>/dev/null || true)
if [ -z "$PODS" ]; then
echo "⚠️ No pods found"
else
for POD in $PODS; do
POD_NAME=$(basename "$POD")
TEMPLATE=$(kubectl -n "$NAMESPACE" get pod "$POD_NAME" -o jsonpath='{.metadata.labels.workflows\.argoproj\.io/template}' 2>/dev/null || echo "unknown")
echo ""
echo "--- $POD_NAME ($TEMPLATE) ---"
kubectl -n "$NAMESPACE" logs "$POD_NAME" --tail=100 -c main 2>/dev/null || echo "No logs available"
done
fi
echo ""

# Step 7: Verify STAC item
echo "=========================================="
echo "STAC Item Verification"
echo "=========================================="
ITEM_ID=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.spec.arguments.parameters[?(@.name=="item_id")].value}')
COLLECTION=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.spec.arguments.parameters[?(@.name=="register_collection")].value}')
STAC_URL="https://api.explorer.eopf.copernicus.eu/stac/collections/$COLLECTION/items/$ITEM_ID"

echo "Checking: $STAC_URL"
ITEM_STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$STAC_URL")
if [ "$ITEM_STATUS" = "200" ]; then
echo "✅ STAC item exists!"
echo ""
curl -s "$STAC_URL" | jq '{
id: .id,
collection: .collection,
geometry: .geometry.type,
assets: [.assets | keys[]],
links: [.links[] | select(.rel=="xyz" or .rel=="viewer" or .rel=="tilejson") | {rel, href}]
}'
else
echo "❌ STAC item not found (HTTP $ITEM_STATUS)"
fi
echo ""

echo "=========================================="
echo "Test Summary"
echo "=========================================="
echo "Workflow: $WORKFLOW"
echo "Status: $PHASE"
echo "STAC Item: $ITEM_STATUS"
echo ""
if [ "$PHASE" = "Succeeded" ] && [ "$ITEM_STATUS" = "200" ]; then
echo "🎉 END-TO-END TEST PASSED!"
exit 0
else
echo "❌ END-TO-END TEST FAILED"
exit 1
fi
Empty file added tests/__init__.py
Empty file.
133 changes: 133 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""Pytest configuration and shared fixtures for data-pipeline tests."""

import atexit
import sys
import warnings

import pytest

# Suppress noisy async context warnings from zarr/s3fs
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", message="coroutine.*was never awaited")


# Global stderr filter that stays active even after pytest teardown
_original_stderr = sys.stderr
_suppress_traceback = False


class _FilteredStderr:
def write(self, text):
global _suppress_traceback

# Start suppressing when we see async context errors
if any(
marker in text
for marker in [
"Exception ignored",
"Traceback (most recent call last)",
"ValueError: <Token",
"was created in a different Context",
"zarr/storage/",
"s3fs/core.py",
"aiobotocore/context.py",
]
):
_suppress_traceback = True

# Reset suppression on empty lines (between tracebacks)
if not text.strip():
_suppress_traceback = False

# Only write if not currently suppressing
if not _suppress_traceback:
_original_stderr.write(text)

def flush(self):
_original_stderr.flush()


def _restore_stderr():
"""Restore original stderr at exit."""
sys.stderr = _original_stderr


# Install filter at module load time
sys.stderr = _FilteredStderr()
atexit.register(_restore_stderr)


@pytest.fixture(autouse=True, scope="function")
def clear_prometheus_registry():
"""Clear Prometheus registry before each test to avoid duplicates."""
import contextlib

try:
from prometheus_client import REGISTRY

collectors = list(REGISTRY._collector_to_names.keys())
for collector in collectors:
with contextlib.suppress(Exception):
REGISTRY.unregister(collector)
except ImportError:
pass
yield


@pytest.fixture
def sample_stac_item():
"""Return a minimal STAC item for testing."""
return {
"type": "Feature",
"stac_version": "1.0.0",
"id": "test-item",
"properties": {
"datetime": "2025-01-01T00:00:00Z",
"proj:epsg": 32636,
},
"geometry": {
"type": "Polygon",
"coordinates": [
[
[600000, 6290220],
[709800, 6290220],
[709800, 6400020],
[600000, 6400020],
[600000, 6290220],
]
],
},
"links": [],
"assets": {
"B01": {
"href": "s3://bucket/data/B01.tif",
"type": "image/tiff; application=geotiff",
"roles": ["data"],
"proj:epsg": 32636,
"proj:shape": [10980, 10980],
"proj:transform": [10, 0, 600000, 0, -10, 6400020],
}
},
"collection": "test-collection",
}


@pytest.fixture
def stac_item_with_proj_code(sample_stac_item):
"""Return a STAC item with proj:code (should be removed)."""
item = sample_stac_item.copy()
item["properties"]["proj:code"] = "EPSG:32636"
item["assets"]["B01"]["proj:code"] = "EPSG:32636"
return item


@pytest.fixture
def mock_zarr_url():
"""Return a sample GeoZarr URL."""
return "s3://bucket/path/to/dataset.zarr"


@pytest.fixture
def mock_stac_api_url():
"""Return a mock STAC API URL."""
return "https://api.example.com/stac"
Empty file added tests/integration/__init__.py
Empty file.
Loading