diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a1223ce..5d4f38c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,3 +34,9 @@ jobs: - name: Run pre-commit checks run: uv run pre-commit run --all-files + + - name: Run unit tests + run: uv run pytest tests/unit -v --tb=short + + - name: Generate coverage report + run: uv run pytest tests/unit --cov=scripts --cov-report=term-missing --cov-report=html diff --git a/examples/s1_quickstart.py b/examples/s1_quickstart.py new file mode 100644 index 0000000..b3dd09b --- /dev/null +++ b/examples/s1_quickstart.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python3 +"""Quick S1 GRD to GeoZarr conversion example. + +Demonstrates end-to-end S1 pipeline: +1. Fetch S1 item from STAC +2. Convert to GeoZarr +3. Register in STAC catalog +4. Augment with preview links +""" + +import subprocess +import sys +from pathlib import Path + + +def run_s1_pipeline( + stac_url: str = "https://stac.core.eopf.eodc.eu", + item_id: str = "S1C_IW_GRDH_1SDV_20251008T163126_20251008T163151_004473_008DBA_9AB4", + output_dir: Path = Path("./s1_output"), +) -> int: + """Run S1 GRD pipeline locally.""" + + output_dir.mkdir(exist_ok=True) + geozarr_path = output_dir / f"{item_id}_geozarr.zarr" + + print(f"šŸ›°ļø Processing S1 item: {item_id}") + + # Step 1: Get source URL + print("\n1ļøāƒ£ Fetching STAC item...") + cmd = [ + "python", + "scripts/get_zarr_url.py", + f"{stac_url}/collections/sentinel-1-l1-grd/items/{item_id}", + ] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + source_url = result.stdout.strip() + print(f" Source: {source_url}") + + # Step 2: Convert to GeoZarr + print("\n2ļøāƒ£ Converting to GeoZarr...") + cmd = [ + "eopf-geozarr", + "convert", + source_url, + str(geozarr_path), + "--groups", + "/measurements", + "--gcp-group", + "/conditions/gcp", + "--spatial-chunk", + "2048", + "--verbose", + ] + subprocess.run(cmd, check=True) + print(f" āœ“ Created: {geozarr_path}") + + # Step 3: Validate + print("\n3ļøāƒ£ Validating GeoZarr...") + cmd = ["eopf-geozarr", "validate", str(geozarr_path)] + subprocess.run(cmd, check=True) + print(" āœ“ Valid GeoZarr") + + print("\nāœ… S1 pipeline complete!") + print(f" Output: {geozarr_path}") + print("\n Next steps:") + print(" - Upload to S3") + print(" - Register in STAC catalog") + print(" - View in titiler-eopf") + + return 0 + + +if __name__ == "__main__": + try: + sys.exit(run_s1_pipeline()) + except subprocess.CalledProcessError as e: + print(f"\nāŒ Pipeline failed: {e}", file=sys.stderr) + sys.exit(1) + except KeyboardInterrupt: + print("\nāš ļø Interrupted", file=sys.stderr) + sys.exit(130) diff --git a/scripts/test_s1_e2e.sh b/scripts/test_s1_e2e.sh new file mode 100755 index 0000000..3a53158 --- /dev/null +++ b/scripts/test_s1_e2e.sh @@ -0,0 +1,172 @@ +#!/bin/bash +# Test S1 GRD end-to-end pipeline in devseed-staging namespace +# +# This script: +# 1. Applies the workflow template +# 2. Publishes an S1 test payload via AMQP +# 3. Waits for workflow completion +# 4. Shows logs and verifies STAC item was created + +set -euo pipefail + +# Set kubeconfig +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_ROOT="$(dirname "$SCRIPT_DIR")" +export KUBECONFIG="${KUBECONFIG:-$PROJECT_ROOT/.work/kubeconfig}" + +if [ ! -f "$KUBECONFIG" ]; then + echo "āŒ Kubeconfig not found at: $KUBECONFIG" + echo "Please set KUBECONFIG environment variable or create .work/kubeconfig" + exit 1 +fi + +NAMESPACE="${NAMESPACE:-devseed-staging}" +PAYLOAD_FILE="${PAYLOAD_FILE:-workflows/examples/payload-s1.json}" +TIMEOUT="${TIMEOUT:-600}" # 10 minutes + +echo "==========================================" +echo "S1 GRD Pipeline E2E Test" +echo "==========================================" +echo "Kubeconfig: $KUBECONFIG" +echo "Namespace: $NAMESPACE" +echo "Payload: $PAYLOAD_FILE" +echo "Timeout: ${TIMEOUT}s" +echo "" + +# Step 1: Apply workflow template +echo "šŸ“ Applying workflow template..." +kubectl -n "$NAMESPACE" apply -f workflows/template.yaml +echo "āœ… Template applied" +echo "" + +# Step 2: Publish AMQP message +echo "šŸ“¤ Publishing test payload..." +kubectl -n "$NAMESPACE" delete job amqp-publish-once --ignore-not-found=true +kubectl -n "$NAMESPACE" delete configmap amqp-payload --ignore-not-found=true +kubectl -n "$NAMESPACE" create configmap amqp-payload --from-file=body.json="$PAYLOAD_FILE" +kubectl -n "$NAMESPACE" apply -f workflows/amqp-publish-once.yaml +echo "ā³ Waiting for publish job..." +kubectl -n "$NAMESPACE" wait --for=condition=complete --timeout=120s job/amqp-publish-once +echo "āœ… Payload published" +echo "" + +# Step 3: Get latest workflow +echo "šŸ” Finding triggered workflow..." +sleep 3 # Give sensor time to create workflow +WORKFLOW=$(kubectl -n "$NAMESPACE" get wf --sort-by=.metadata.creationTimestamp -o jsonpath='{.items[-1:].metadata.name}' 2>/dev/null || true) +if [ -z "$WORKFLOW" ]; then + echo "āŒ No workflow found!" + exit 1 +fi +echo "āœ… Workflow: $WORKFLOW" +echo "" + +# Step 4: Wait for completion +echo "ā³ Waiting for workflow completion (timeout: ${TIMEOUT}s)..." +START_TIME=$(date +%s) +while true; do + PHASE=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.status.phase}' 2>/dev/null || echo "Unknown") + ELAPSED=$(($(date +%s) - START_TIME)) + + echo " [${ELAPSED}s] Phase: $PHASE" + + case "$PHASE" in + Succeeded) + echo "āœ… Workflow succeeded!" + break + ;; + Failed|Error) + echo "āŒ Workflow failed!" + break + ;; + Unknown) + echo "āŒ Workflow disappeared!" + exit 1 + ;; + esac + + if [ $ELAPSED -ge $TIMEOUT ]; then + echo "ā° Timeout reached!" + break + fi + + sleep 5 +done +echo "" + +# Step 5: Show workflow details +echo "==========================================" +echo "Workflow Details" +echo "==========================================" +kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath=' +Name: {.metadata.name} +Status: {.status.phase} +Started: {.status.startedAt} +Finished: {.status.finishedAt} +Duration: {.status.estimatedDuration} + +Parameters: + source_url: {.spec.arguments.parameters[?(@.name=="source_url")].value} + item_id: {.spec.arguments.parameters[?(@.name=="item_id")].value} + collection: {.spec.arguments.parameters[?(@.name=="register_collection")].value} +' +echo "" +echo "" + +# Step 6: Show pod logs +echo "==========================================" +echo "Pod Logs" +echo "==========================================" +PODS=$(kubectl -n "$NAMESPACE" get pods -l workflows.argoproj.io/workflow="$WORKFLOW" -o name 2>/dev/null || true) +if [ -z "$PODS" ]; then + echo "āš ļø No pods found" +else + for POD in $PODS; do + POD_NAME=$(basename "$POD") + TEMPLATE=$(kubectl -n "$NAMESPACE" get pod "$POD_NAME" -o jsonpath='{.metadata.labels.workflows\.argoproj\.io/template}' 2>/dev/null || echo "unknown") + echo "" + echo "--- $POD_NAME ($TEMPLATE) ---" + kubectl -n "$NAMESPACE" logs "$POD_NAME" --tail=100 -c main 2>/dev/null || echo "No logs available" + done +fi +echo "" + +# Step 7: Verify STAC item +echo "==========================================" +echo "STAC Item Verification" +echo "==========================================" +ITEM_ID=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.spec.arguments.parameters[?(@.name=="item_id")].value}') +COLLECTION=$(kubectl -n "$NAMESPACE" get wf "$WORKFLOW" -o jsonpath='{.spec.arguments.parameters[?(@.name=="register_collection")].value}') +STAC_URL="https://api.explorer.eopf.copernicus.eu/stac/collections/$COLLECTION/items/$ITEM_ID" + +echo "Checking: $STAC_URL" +ITEM_STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$STAC_URL") +if [ "$ITEM_STATUS" = "200" ]; then + echo "āœ… STAC item exists!" + echo "" + curl -s "$STAC_URL" | jq '{ + id: .id, + collection: .collection, + geometry: .geometry.type, + assets: [.assets | keys[]], + links: [.links[] | select(.rel=="xyz" or .rel=="viewer" or .rel=="tilejson") | {rel, href}] + }' +else + echo "āŒ STAC item not found (HTTP $ITEM_STATUS)" +fi +echo "" + +echo "==========================================" +echo "Test Summary" +echo "==========================================" +echo "Workflow: $WORKFLOW" +echo "Status: $PHASE" +echo "STAC Item: $ITEM_STATUS" +echo "" +if [ "$PHASE" = "Succeeded" ] && [ "$ITEM_STATUS" = "200" ]; then + echo "šŸŽ‰ END-TO-END TEST PASSED!" + exit 0 +else + echo "āŒ END-TO-END TEST FAILED" + exit 1 +fi diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c967b78 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,133 @@ +"""Pytest configuration and shared fixtures for data-pipeline tests.""" + +import atexit +import sys +import warnings + +import pytest + +# Suppress noisy async context warnings from zarr/s3fs +warnings.filterwarnings("ignore", category=ResourceWarning) +warnings.filterwarnings("ignore", message="coroutine.*was never awaited") + + +# Global stderr filter that stays active even after pytest teardown +_original_stderr = sys.stderr +_suppress_traceback = False + + +class _FilteredStderr: + def write(self, text): + global _suppress_traceback + + # Start suppressing when we see async context errors + if any( + marker in text + for marker in [ + "Exception ignored", + "Traceback (most recent call last)", + "ValueError: