diff --git a/scripts/README_update_storage_tier.md b/scripts/README_update_storage_tier.md new file mode 100644 index 0000000..8ff06b9 --- /dev/null +++ b/scripts/README_update_storage_tier.md @@ -0,0 +1,103 @@ +# Update STAC Storage Tier Metadata + +Updates existing STAC items with current S3 storage tier metadata. + +## Modes + +**Update (default)**: Updates `ovh:storage_tier` for assets with existing `alternate.s3` + +**Add Missing (`--add-missing`)**: Creates `alternate.s3` structure for legacy items without it + +## Storage Tier Detection + +- **Single file**: Returns tier directly (e.g., `"GLACIER"`) +- **Uniform Zarr**: All files same tier (e.g., `"GLACIER"` + distribution) +- **Mixed Zarr**: Different tiers detected (tier: `"MIXED"` + distribution breakdown) + +Distribution shows file counts per tier, based on sample of up to 100 files. + +### Example: Mixed Storage +```json +{ + "ovh:storage_tier": "MIXED", + "ovh:storage_tier_distribution": { + "STANDARD": 450, + "GLACIER": 608 + } +} +``` + +## Notes + +- Thumbnail assets automatically skipped +- Failed S3 queries remove existing `ovh:storage_tier` field +- Distribution metadata only for Zarr directories + +## Setup + +```bash +# Install dependencies +uv sync + +# Set environment variables +export AWS_ACCESS_KEY_ID="your-access-key" +export AWS_SECRET_ACCESS_KEY="your-secret-key" +export S3_ENDPOINT="https://s3.de.io.cloud.ovh.net" +export STAC_API_URL="https://api.explorer.eopf.copernicus.eu/stac/" +export ITEM_URL="${STAC_API_URL}/collections/sentinel-2-l2a/items/ITEM_ID" +``` + +## Usage + +```bash +# Dry run (preview changes) +uv run python scripts/update_stac_storage_tier.py \ + --stac-item-url "$ITEM_URL" \ + --stac-api-url "$STAC_API_URL" \ + --s3-endpoint "$S3_ENDPOINT" \ + --dry-run + +# Update existing alternate.s3 +uv run python scripts/update_stac_storage_tier.py \ + --stac-item-url "$ITEM_URL" \ + --stac-api-url "$STAC_API_URL" \ + --s3-endpoint "$S3_ENDPOINT" + +# Add missing alternate.s3 (legacy items) +uv run python scripts/update_stac_storage_tier.py \ + --stac-item-url "$ITEM_URL" \ + --stac-api-url "$STAC_API_URL" \ + --s3-endpoint "$S3_ENDPOINT" \ + --add-missing +``` + +## Output Examples + +**Success:** +``` +Processing: S2A_MSIL2A_20251008T100041_N0511_R122_T32TQM_20251008T122613 + Assets with alternate.s3: 15 + Assets with queryable storage tier: 15 + Assets updated: 15 + ✅ Updated item (HTTP 201) +``` + +**Mixed storage detected:** +``` +Processing: S2A_MSIL2A_20251208T100431_N0511_R122_T32TQQ_20251208T121910 + reflectance: Mixed storage detected - {'STANDARD': 450, 'GLACIER': 608} + Assets updated: 1 + ✅ Updated item (HTTP 201) +``` + +**S3 query failures:** +``` + ⚠️ Failed to query storage tier from S3 for 4 asset(s) + Check AWS credentials, S3 permissions, or if objects are Zarr directories +``` + +## Related Scripts + +- `register_v1.py` - Initial STAC registration (includes storage tier) +- `change_storage_tier.py` - Change S3 storage classes +- `storage_tier_utils.py` - Shared utilities for storage tier operations diff --git a/scripts/register_v1.py b/scripts/register_v1.py index 503d21e..0c66fcc 100755 --- a/scripts/register_v1.py +++ b/scripts/register_v1.py @@ -15,6 +15,7 @@ from pystac import Asset, Item, Link from pystac.extensions.projection import ProjectionExtension from pystac_client import Client +from storage_tier_utils import extract_region_from_endpoint, get_s3_storage_class # Configure logging (set LOG_LEVEL=DEBUG for verbose output) logging.basicConfig( @@ -353,20 +354,8 @@ def add_alternate_s3_assets(item: Item, s3_endpoint: str) -> None: if ext not in item.stac_extensions: item.stac_extensions.append(ext) - # Parse endpoint to extract region info - # For OVHcloud endpoints like "s3.de.io.cloud.ovh.net", region is "de" - endpoint_host = urlparse(s3_endpoint).netloc or urlparse(s3_endpoint).path - region = "unknown" - if ".de." in endpoint_host: - region = "de" - elif ".gra." in endpoint_host: - region = "gra" - elif ".sbg." in endpoint_host: - region = "sbg" - elif ".uk." in endpoint_host: - region = "uk" - elif ".ca." in endpoint_host: - region = "ca" + # Extract region from endpoint + region = extract_region_from_endpoint(s3_endpoint) # Add alternate to each asset with data role that has an HTTPS URL modified_count = 0 @@ -383,18 +372,40 @@ def add_alternate_s3_assets(item: Item, s3_endpoint: str) -> None: if not s3_url: continue + # Query storage class for this asset + storage_tier = get_s3_storage_class(s3_url, s3_endpoint) + # Add alternate with storage extension fields if not hasattr(asset, "extra_fields"): asset.extra_fields = {} - asset.extra_fields["alternate"] = { - "s3": { - "href": s3_url, - "storage:platform": "OVHcloud", - "storage:region": region, - "storage:requester_pays": False, - } + # Preserve existing alternate structure if present + existing_alternate = asset.extra_fields.get("alternate", {}) + if not isinstance(existing_alternate, dict): + existing_alternate = {} + + # Get existing s3 alternate or create new one + existing_s3 = existing_alternate.get("s3", {}) + if not isinstance(existing_s3, dict): + existing_s3 = {} + + # Update s3 alternate (preserving any existing fields) + s3_alternate = { + **existing_s3, # Preserve existing fields + "href": s3_url, + "storage:platform": "OVHcloud", + "storage:region": region, + "storage:requester_pays": False, } + + # Add storage tier as a custom field (not part of storage extension spec) + # Using ovh: prefix to indicate vendor-specific extension + if storage_tier: + s3_alternate["ovh:storage_tier"] = storage_tier + + # Preserve other alternate formats (e.g., alternate.xarray if it exists) + existing_alternate["s3"] = s3_alternate + asset.extra_fields["alternate"] = existing_alternate modified_count += 1 if modified_count > 0: @@ -648,19 +659,20 @@ def run_registration( remove_xarray_integration(item) # 7. Add alternate S3 URLs to assets (alternate-assets + storage extensions) + # This also queries and adds storage:tier to each asset's alternate add_alternate_s3_assets(item, s3_endpoint) # 8. Add visualization links (viewer, xyz, tilejson) add_visualization_links(item, raster_api_url, collection) logger.info(" 🎨 Added visualization links") - # 9. Add thumbnail asset for STAC browsers + # 10. Add thumbnail asset for STAC browsers add_thumbnail_asset(item, raster_api_url, collection) - # 10. Add derived_from link to source item + # 11. Add derived_from link to source item add_derived_from_link(item, source_url) - # 11. Register to STAC API + # 12. Register to STAC API client = Client.open(stac_api_url) upsert_item(client, collection, item) diff --git a/scripts/storage_tier_utils.py b/scripts/storage_tier_utils.py new file mode 100644 index 0000000..3ad0a5c --- /dev/null +++ b/scripts/storage_tier_utils.py @@ -0,0 +1,291 @@ +#!/usr/bin/env python3 +"""Shared utilities for S3 storage tier operations.""" + +from __future__ import annotations + +import logging +import os +from typing import Literal, TypedDict +from urllib.parse import urlparse + +import boto3 +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + +# Valid OVH Cloud storage tiers (as confirmed in PR #62) +# https://github.com/EOPF-Explorer/data-pipeline/pull/62 +OVHStorageTier = Literal["STANDARD", "STANDARD_IA", "EXPRESS_ONEZONE"] +VALID_STORAGE_TIERS: frozenset[str] = frozenset(["STANDARD", "STANDARD_IA", "EXPRESS_ONEZONE"]) + +# Special tier value for mixed storage detection +StorageTier = Literal["STANDARD", "STANDARD_IA", "EXPRESS_ONEZONE", "MIXED"] + + +class StorageTierInfo(TypedDict): + """Storage tier information including distribution for mixed storage. + + Attributes: + tier: Storage class - must be one of: STANDARD, STANDARD_IA, EXPRESS_ONEZONE, or MIXED + distribution: Count of objects per tier (None for single files, dict for Zarr directories) + """ + + tier: str # StorageTier - using str for backwards compatibility + distribution: dict[str, int] | None + + +def validate_storage_tier(tier: str) -> bool: + """Validate that a storage tier is a valid OVH Cloud storage class. + + Args: + tier: Storage tier string to validate + + Returns: + True if tier is valid (STANDARD, STANDARD_IA, or EXPRESS_ONEZONE), False otherwise + + Note: + This function validates against actual OVH Cloud storage classes. + Do not use AWS-specific tiers like GLACIER, GLACIER_IR, DEEP_ARCHIVE, etc. + """ + return tier in VALID_STORAGE_TIERS + + +def get_s3_storage_class(s3_url: str, s3_endpoint: str) -> str | None: + """Get the storage class of an S3 object. + + Args: + s3_url: S3 URL (s3://bucket/key) + s3_endpoint: S3 endpoint URL + + Returns: + Storage class name (e.g., 'STANDARD', 'STANDARD_IA', 'EXPRESS_ONEZONE') or None if unavailable + + Note: This function returns the most common storage class for Zarr directories. + Use get_s3_storage_info() for detailed distribution information. + """ + if not s3_url.startswith("s3://"): + return None + + parsed = urlparse(s3_url) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + + if not key: # No key specified (root bucket) + return None + + try: + # Initialize S3 client with endpoint + s3_config = {} + if s3_endpoint: + s3_config["endpoint_url"] = s3_endpoint + elif os.getenv("AWS_ENDPOINT_URL"): + s3_config["endpoint_url"] = os.getenv("AWS_ENDPOINT_URL") # type: ignore + + s3_client = boto3.client("s3", **s3_config) # type: ignore + + # Try to query the key directly first + try: + response = s3_client.head_object(Bucket=bucket, Key=key) + except ClientError as e: + # If 404, this might be a Zarr directory path (without trailing /) + # Try listing objects under this prefix + if e.response.get("Error", {}).get("Code") == "404": + prefix = key if key.endswith("/") else key + "/" + list_response = s3_client.list_objects_v2( + Bucket=bucket, + Prefix=prefix, + MaxKeys=100, # Check up to 100 files + ) + if "Contents" not in list_response or len(list_response["Contents"]) == 0: + logger.debug(f"No objects found under prefix {s3_url}") + return None + + # Extract storage classes from list response (no need for additional head_object calls) + storage_classes = [] + for obj in list_response["Contents"]: + # StorageClass field is included in list_objects_v2 response + obj_class = obj.get("StorageClass", "STANDARD") + storage_classes.append(obj_class) + + if not storage_classes: + logger.debug(f"Could not determine storage class for any object under {s3_url}") + return None + + # Count occurrences of each storage class + storage_class_counts: dict[str, int] = {} + for sc in storage_classes: + storage_class_counts[sc] = storage_class_counts.get(sc, 0) + 1 + + # Check for mixed storage classes + unique_classes = set(storage_classes) + if len(unique_classes) > 1: + total_files = len(storage_classes) + distribution = ", ".join( + f"{sc}: {count}/{total_files}" + for sc, count in sorted(storage_class_counts.items()) + ) + logger.warning( + f"Mixed storage classes detected for {s3_url}: {distribution}. " + f"Returning most common class." + ) + + # Return the most common storage class (or first alphabetically if tied) + most_common = max(storage_class_counts.items(), key=lambda x: (x[1], x[0])) + storage_class: str = most_common[0] + logger.debug( + f"Sampled {len(storage_classes)} files under {s3_url}, " + f"storage class: {storage_class}" + ) + return storage_class + else: + raise + + # StorageClass is not present for STANDARD tier, default to STANDARD + storage_class_value: str = response.get("StorageClass", "STANDARD") + return storage_class_value + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code", "Unknown") + logger.debug(f"ClientError ({error_code}) for {s3_url}: {e}") + return None + except Exception as e: + logger.error( + f"Unexpected error getting storage class for {s3_url}: {type(e).__name__}: {e}" + ) + return None + + +def get_s3_storage_info( + s3_url: str, s3_endpoint: str, max_samples: int = 100 +) -> StorageTierInfo | None: + """Get detailed storage tier information for an S3 object or Zarr directory. + + For single files, returns the storage class. For Zarr directories, samples up to + max_samples files and returns distribution information. If storage classes are + mixed, returns "MIXED" with a distribution dictionary. + + Args: + s3_url: S3 URL (s3://bucket/key) + s3_endpoint: S3 endpoint URL + max_samples: Maximum number of files to sample for Zarr directories (default: 100) + + Returns: + StorageTierInfo dict with 'tier' and 'distribution' keys, or None if unavailable + - tier: Storage class ("STANDARD", "STANDARD_IA", "MIXED", etc.) + - distribution: Dict of {storage_class: count} (only present if MIXED or multiple files sampled) + + Examples: + Single file: + {'tier': 'STANDARD', 'distribution': None} + + Zarr directory (uniform): + {'tier': 'STANDARD_IA', 'distribution': {'STANDARD_IA': 50}} + + Zarr directory (mixed): + {'tier': 'MIXED', 'distribution': {'STANDARD': 450, 'STANDARD_IA': 608}} + """ + if not s3_url.startswith("s3://"): + return None + + parsed = urlparse(s3_url) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + + if not key: # No key specified (root bucket) + return None + + try: + # Initialize S3 client with endpoint + s3_config = {} + if s3_endpoint: + s3_config["endpoint_url"] = s3_endpoint + elif os.getenv("AWS_ENDPOINT_URL"): + s3_config["endpoint_url"] = os.getenv("AWS_ENDPOINT_URL") # type: ignore + + s3_client = boto3.client("s3", **s3_config) # type: ignore + + # Try to query the key directly first (single file) + try: + response = s3_client.head_object(Bucket=bucket, Key=key) + storage_class = response.get("StorageClass", "STANDARD") + return {"tier": storage_class, "distribution": None} + except ClientError as e: + # If 404, this might be a Zarr directory path + if e.response.get("Error", {}).get("Code") == "404": + prefix = key if key.endswith("/") else key + "/" + list_response = s3_client.list_objects_v2( + Bucket=bucket, Prefix=prefix, MaxKeys=max_samples + ) + if "Contents" not in list_response or len(list_response["Contents"]) == 0: + logger.debug(f"No objects found under prefix {s3_url}") + return None + + # Extract storage classes from list response + storage_classes = [] + for obj in list_response["Contents"]: + obj_class = obj.get("StorageClass", "STANDARD") + storage_classes.append(obj_class) + + if not storage_classes: + logger.debug(f"Could not determine storage class for any object under {s3_url}") + return None + + # Count occurrences of each storage class + storage_class_counts: dict[str, int] = {} + for sc in storage_classes: + storage_class_counts[sc] = storage_class_counts.get(sc, 0) + 1 + + # Check for mixed storage classes + unique_classes = set(storage_classes) + if len(unique_classes) > 1: + # Mixed storage detected + total_files = len(storage_classes) + distribution_str = ", ".join( + f"{sc}: {count}/{total_files}" + for sc, count in sorted(storage_class_counts.items()) + ) + logger.info(f"Mixed storage classes detected for {s3_url}: {distribution_str}") + return {"tier": "MIXED", "distribution": storage_class_counts} + else: + # Uniform storage class + storage_class = list(unique_classes)[0] + logger.debug( + f"Sampled {len(storage_classes)} files under {s3_url}, " + f"uniform storage class: {storage_class}" + ) + return {"tier": storage_class, "distribution": storage_class_counts} + else: + raise + except ClientError as e: + error_code = e.response.get("Error", {}).get("Code", "Unknown") + logger.debug(f"ClientError ({error_code}) for {s3_url}: {e}") + return None + except Exception as e: + logger.error(f"Unexpected error getting storage info for {s3_url}: {type(e).__name__}: {e}") + return None + + +def extract_region_from_endpoint(s3_endpoint: str) -> str: + """Extract region from S3 endpoint URL. + + For OVHcloud endpoints like "s3.de.io.cloud.ovh.net", extracts region code. + + Args: + s3_endpoint: S3 endpoint URL + + Returns: + Region code (e.g., 'de', 'gra', 'sbg', 'uk', 'ca') or 'unknown' + """ + endpoint_host = urlparse(s3_endpoint).netloc or urlparse(s3_endpoint).path + + if ".de." in endpoint_host: + return "de" + elif ".gra." in endpoint_host: + return "gra" + elif ".sbg." in endpoint_host: + return "sbg" + elif ".uk." in endpoint_host: + return "uk" + elif ".ca." in endpoint_host: + return "ca" + + return "unknown" diff --git a/scripts/update_stac_storage_tier.py b/scripts/update_stac_storage_tier.py new file mode 100644 index 0000000..95e9bdc --- /dev/null +++ b/scripts/update_stac_storage_tier.py @@ -0,0 +1,389 @@ +#!/usr/bin/env python3 +"""Update STAC items with current S3 storage tier metadata. + +This script fetches STAC items and updates their alternate.s3 objects with +current storage tier information from S3. It can also be use for updating existing items +that were registered before storage tier tracking was implemented. +""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys +from pathlib import Path + +# Add scripts directory to path to import from register_v1 +scripts_dir = Path(__file__).parent +if str(scripts_dir) not in sys.path: + sys.path.insert(0, str(scripts_dir)) + +import httpx # noqa: E402 +from pystac import Item # noqa: E402 +from pystac_client import Client # noqa: E402 +from register_v1 import https_to_s3 # noqa: E402 +from storage_tier_utils import ( # noqa: E402 + extract_region_from_endpoint, + get_s3_storage_info, +) + +# Configure logging +logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO"), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +for lib in ["botocore", "boto3", "urllib3", "httpx", "httpcore"]: + logging.getLogger(lib).setLevel(logging.WARNING) + + +def update_item_storage_tiers( + item: Item, s3_endpoint: str, add_missing: bool = False +) -> tuple[int, int, int, int, int, int]: + """Update storage tier metadata for all assets in a STAC item. + + Args: + item: STAC item to update + s3_endpoint: S3 endpoint URL + add_missing: If True, add alternate.s3 to assets that don't have it + + Returns: + Tuple of (assets_updated, assets_with_alternate_s3, assets_with_tier, + assets_added, assets_skipped, assets_s3_failed) + """ + # Ensure required extensions are present + extensions = [ + "https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json", + "https://stac-extensions.github.io/storage/v2.0.0/schema.json", + ] + + if not hasattr(item, "stac_extensions"): + item.stac_extensions = [] + + for ext in extensions: + if ext not in item.stac_extensions: + item.stac_extensions.append(ext) + + region = extract_region_from_endpoint(s3_endpoint) + + assets_updated = 0 + assets_with_alternate_s3 = 0 + assets_with_tier = 0 + assets_added = 0 + assets_skipped = 0 + assets_s3_failed = 0 + + for asset_key, asset in item.assets.items(): + # Skip thumbnail and other non-data assets + if asset.roles and "thumbnail" in asset.roles: + continue + + # Initialize extra_fields if needed + if not hasattr(asset, "extra_fields"): + asset.extra_fields = {} + + # Check if asset has alternate.s3 + has_alternate = ( + "alternate" in asset.extra_fields + and isinstance(asset.extra_fields["alternate"], dict) + and "s3" in asset.extra_fields["alternate"] + ) + + # If no alternate.s3 and add_missing is True, try to add it + if not has_alternate and add_missing: + if not asset.href or not asset.href.startswith("https://"): + logger.info(f" {asset_key}: Skipping (href: {asset.href})") + assets_skipped += 1 + continue + + # Try to convert HTTPS URL to S3 URL + s3_url = https_to_s3(asset.href) + if not s3_url: + logger.info(f" {asset_key}: Could not convert to S3 URL (href: {asset.href})") + assets_skipped += 1 + continue + + # Query storage tier from S3 (s3_url is guaranteed to be str here) + storage_info = get_s3_storage_info(s3_url, s3_endpoint) + + if storage_info is None: + logger.warning( + f" {asset_key}: Could not query storage tier from S3 (check credentials/permissions)" + ) + logger.warning(f" {asset_key}: Skipping - cannot verify S3 object exists") + assets_s3_failed += 1 + assets_skipped += 1 + continue + + tier = storage_info["tier"] + + # Preserve existing alternate structure if present + existing_alternate = asset.extra_fields.get("alternate", {}) + if not isinstance(existing_alternate, dict): + existing_alternate = {} + + # Create alternate.s3 object + s3_alternate = { + "href": s3_url, + "storage:platform": "OVHcloud", + "storage:region": region, + "storage:requester_pays": False, + "ovh:storage_tier": tier, + } + + # Add distribution if storage is mixed or multiple files sampled + if storage_info["distribution"] is not None: + s3_alternate["ovh:storage_tier_distribution"] = storage_info["distribution"] + + # Preserve other alternate formats (e.g., alternate.xarray if it exists) + existing_alternate["s3"] = s3_alternate + asset.extra_fields["alternate"] = existing_alternate + assets_added += 1 + assets_updated += 1 + assets_with_tier += 1 + logger.info(f" {asset_key}: Added alternate.s3 with tier {tier}") + continue + + # If no alternate.s3 and not adding, skip + if not has_alternate: + continue + + # Asset has alternate.s3 + assets_with_alternate_s3 += 1 + + # Update existing alternate.s3 + s3_info = asset.extra_fields["alternate"]["s3"] + if not isinstance(s3_info, dict) or "href" not in s3_info: + continue + + s3_url = s3_info["href"] + if not isinstance(s3_url, str): + continue + + # Query current storage tier from S3 + storage_info = get_s3_storage_info(s3_url, s3_endpoint) + + if storage_info is None: + logger.warning( + f" {asset_key}: Could not query storage tier from S3 (check credentials/permissions)" + ) + assets_s3_failed += 1 + storage_tier: str | None = None + else: + storage_tier = storage_info["tier"] + + # Track if anything changed + asset_changed = False + old_tier = s3_info.get("ovh:storage_tier") + + # Update or add storage extension fields (only if missing) + if "storage:platform" not in s3_info: + s3_info["storage:platform"] = "OVHcloud" + asset_changed = True + if "storage:region" not in s3_info: + s3_info["storage:region"] = region + asset_changed = True + if "storage:requester_pays" not in s3_info: + s3_info["storage:requester_pays"] = False + asset_changed = True + + # Add/update storage tier if available + if storage_tier: + s3_info["ovh:storage_tier"] = storage_tier + assets_with_tier += 1 + + # Add or update distribution if available + if storage_info and storage_info.get("distribution") is not None: + s3_info["ovh:storage_tier_distribution"] = storage_info["distribution"] + if storage_tier == "MIXED": + logger.info( + f" {asset_key}: Mixed storage detected - {storage_info['distribution']}" + ) + else: + # Remove distribution if no longer mixed + if "ovh:storage_tier_distribution" in s3_info: + del s3_info["ovh:storage_tier_distribution"] + asset_changed = True + + if old_tier != storage_tier: + asset_changed = True + logger.debug(f" {asset_key}: {old_tier or 'none'} -> {storage_tier}") + else: + # Remove tier if it cannot be determined + if "ovh:storage_tier" in s3_info: + del s3_info["ovh:storage_tier"] + asset_changed = True + logger.debug(f" {asset_key}: removed tier (not available)") + # Also remove distribution if present + if "ovh:storage_tier_distribution" in s3_info: + del s3_info["ovh:storage_tier_distribution"] + asset_changed = True + + if asset_changed: + assets_updated += 1 + + return ( + assets_updated, + assets_with_alternate_s3, + assets_with_tier, + assets_added, + assets_skipped, + assets_s3_failed, + ) + + +def update_stac_item( + stac_item_url: str, + stac_api_url: str, + s3_endpoint: str, + dry_run: bool = False, + add_missing: bool = False, +) -> dict[str, int]: + """Update storage tier metadata for a STAC item. + + Args: + stac_item_url: STAC item URL (can be from STAC API or standalone) + stac_api_url: STAC API base URL for updates + s3_endpoint: S3 endpoint URL + dry_run: If True, show changes without updating + add_missing: If True, add alternate.s3 to assets that don't have it + + Returns: + Dictionary with update statistics + """ + # Extract collection and item ID from URL + # Expected format: .../collections/{collection}/items/{item_id} + parts = stac_item_url.rstrip("/").split("/") + if "items" in parts: + item_idx = parts.index("items") + item_id = parts[item_idx + 1] + collection_id = parts[parts.index("collections") + 1] if "collections" in parts else None + else: + logger.error("Could not extract item ID from URL") + return {"updated": 0, "with_tier": 0} + + logger.info(f"Processing: {item_id}") + + # Fetch STAC item + with httpx.Client(timeout=30.0, follow_redirects=True) as http: + resp = http.get(stac_item_url) + resp.raise_for_status() + item = Item.from_dict(resp.json()) + + # Update storage tiers + ( + assets_updated, + assets_with_alternate_s3, + assets_with_tier, + assets_added, + assets_skipped, + assets_s3_failed, + ) = update_item_storage_tiers(item, s3_endpoint, add_missing) + + total_with_alternate = assets_with_alternate_s3 + assets_added + logger.info(f" Assets with alternate.s3: {total_with_alternate}") + if assets_added > 0: + logger.info(f" - Newly added: {assets_added}") + if assets_with_alternate_s3 > 0: + logger.info(f" - Already present: {assets_with_alternate_s3}") + + logger.info(f" Assets with queryable storage tier: {assets_with_tier}") + + if assets_s3_failed > 0: + logger.warning(f" ⚠️ Failed to query storage tier from S3 for {assets_s3_failed} asset(s)") + logger.warning( + " Check AWS credentials, S3 permissions, or if objects are Zarr directories" + ) + + if assets_skipped > 0: + logger.info(f" Assets skipped: {assets_skipped}") + + logger.info(f" Assets updated: {assets_updated}") + + if dry_run: + logger.info(" (DRY RUN - no changes made)") + return { + "updated": assets_updated, + "with_alternate_s3": total_with_alternate, + "with_tier": assets_with_tier, + "added": assets_added, + "skipped": assets_skipped, + "s3_failed": assets_s3_failed, + } + + # Update STAC item if changes were made + if assets_updated > 0: + if not collection_id: + logger.error("Could not determine collection ID - cannot update item") + return { + "updated": 0, + "with_alternate_s3": total_with_alternate, + "with_tier": assets_with_tier, + "s3_failed": assets_s3_failed, + } + + client = Client.open(stac_api_url) + base_url = str(client.self_href).rstrip("/") + + # DELETE then POST (pgstac doesn't support PUT for items) + delete_url = f"{base_url}/collections/{collection_id}/items/{item_id}" + try: + assert client._stac_io is not None + resp = client._stac_io.session.delete(delete_url, timeout=30) + resp.raise_for_status() + logger.debug(f" Deleted existing {item_id}") + except Exception as e: + logger.warning(f" Failed to delete existing item (may not exist): {e}") + # Continue with POST anyway - might be first-time creation + + assert client._stac_io is not None + create_url = f"{base_url}/collections/{collection_id}/items" + resp = client._stac_io.session.post( + create_url, + json=item.to_dict(), + headers={"Content-Type": "application/json"}, + timeout=30, + ) + resp.raise_for_status() + logger.info(f" ✅ Updated {item_id} (HTTP {resp.status_code})") + else: + logger.info(" No changes needed") + + return {"updated": assets_updated, "with_tier": assets_with_tier, "added": assets_added} + + +def main(argv: list[str] | None = None) -> int: + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Update STAC items with current S3 storage tier metadata" + ) + parser.add_argument("--stac-item-url", required=True, help="STAC item URL") + parser.add_argument("--stac-api-url", required=True, help="STAC API base URL for updates") + parser.add_argument("--s3-endpoint", required=True, help="S3 endpoint URL") + parser.add_argument("--dry-run", action="store_true", help="Dry run mode") + parser.add_argument( + "--add-missing", + action="store_true", + help="Add alternate.s3 to assets that don't have it (for legacy items)", + ) + + args = parser.parse_args(argv) + + try: + update_stac_item( + args.stac_item_url, + args.stac_api_url, + args.s3_endpoint, + args.dry_run, + args.add_missing, + ) + return 0 + except Exception as e: + logger.error(f"Failed: {e}", exc_info=True) + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/fixtures/update_storage_tier/README.md b/tests/fixtures/update_storage_tier/README.md new file mode 100644 index 0000000..f6ddaad --- /dev/null +++ b/tests/fixtures/update_storage_tier/README.md @@ -0,0 +1,28 @@ +# Test Fixtures for Storage Tier Update Tests + +This directory contains JSON fixtures used by `test_update_stac_storage_tier.py`. + +## Fixtures + +- **stac_item_before.json**: STAC item with existing `alternate.s3` (STANDARD tier) +- **stac_item_legacy.json**: Legacy STAC item without `alternate.s3` +- **stac_item_after_tier_change.json**: Expected result after tier change to GLACIER +- **stac_item_mixed_storage.json**: STAC item with mixed storage tier distribution +- **s3_storage_responses.json**: Mock S3 storage tier responses for different URLs + +## Usage + +Tests use pytest fixtures that load these JSON files: + +```python +@pytest.fixture +def stac_item_before(): + """STAC item with existing alternate.s3 (STANDARD tier).""" + with open(FIXTURES_DIR / "stac_item_before.json") as f: + return Item.from_dict(json.load(f)) +``` + +This approach makes tests: +- **Readable**: Test logic separated from test data +- **Maintainable**: Update fixtures without touching test code +- **Reusable**: Same fixtures across multiple tests diff --git a/tests/fixtures/update_storage_tier/s3_storage_responses.json b/tests/fixtures/update_storage_tier/s3_storage_responses.json new file mode 100644 index 0000000..3f86238 --- /dev/null +++ b/tests/fixtures/update_storage_tier/s3_storage_responses.json @@ -0,0 +1,14 @@ +{ + "s3://bucket/data.zarr/measurements/reflectance": { + "tier": "GLACIER", + "distribution": null + }, + "s3://bucket/data.zarr/quality/atmosphere": { + "tier": "STANDARD", + "distribution": {"STANDARD": 100} + }, + "s3://bucket/mixed.zarr/reflectance": { + "tier": "MIXED", + "distribution": {"STANDARD": 450, "GLACIER": 608} + } +} diff --git a/tests/fixtures/update_storage_tier/stac_item_after_tier_change.json b/tests/fixtures/update_storage_tier/stac_item_after_tier_change.json new file mode 100644 index 0000000..d571f06 --- /dev/null +++ b/tests/fixtures/update_storage_tier/stac_item_after_tier_change.json @@ -0,0 +1,35 @@ +{ + "stac_version": "1.0.0", + "type": "Feature", + "id": "test-item", + "geometry": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]}, + "bbox": [0, 0, 1, 1], + "properties": { + "datetime": "2023-12-08T10:00:00Z" + }, + "assets": { + "reflectance": { + "href": "https://s3.de.io.cloud.ovh.net/bucket/data.zarr/measurements/reflectance", + "type": "application/vnd+zarr", + "roles": ["data"], + "alternate": { + "s3": { + "href": "s3://bucket/data.zarr/measurements/reflectance", + "storage:platform": "OVHcloud", + "storage:region": "de", + "storage:requester_pays": false, + "ovh:storage_tier": "GLACIER" + } + } + }, + "thumbnail": { + "href": "https://example.com/thumbnail.png", + "type": "image/png", + "roles": ["thumbnail"] + } + }, + "stac_extensions": [ + "https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json", + "https://stac-extensions.github.io/storage/v2.0.0/schema.json" + ] +} diff --git a/tests/fixtures/update_storage_tier/stac_item_before.json b/tests/fixtures/update_storage_tier/stac_item_before.json new file mode 100644 index 0000000..4e1ffea --- /dev/null +++ b/tests/fixtures/update_storage_tier/stac_item_before.json @@ -0,0 +1,35 @@ +{ + "stac_version": "1.0.0", + "type": "Feature", + "id": "test-item", + "geometry": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]}, + "bbox": [0, 0, 1, 1], + "properties": { + "datetime": "2023-12-08T10:00:00Z" + }, + "assets": { + "reflectance": { + "href": "https://s3.de.io.cloud.ovh.net/bucket/data.zarr/measurements/reflectance", + "type": "application/vnd+zarr", + "roles": ["data"], + "alternate": { + "s3": { + "href": "s3://bucket/data.zarr/measurements/reflectance", + "storage:platform": "OVHcloud", + "storage:region": "de", + "storage:requester_pays": false, + "ovh:storage_tier": "STANDARD" + } + } + }, + "thumbnail": { + "href": "https://example.com/thumbnail.png", + "type": "image/png", + "roles": ["thumbnail"] + } + }, + "stac_extensions": [ + "https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json", + "https://stac-extensions.github.io/storage/v2.0.0/schema.json" + ] +} diff --git a/tests/fixtures/update_storage_tier/stac_item_legacy.json b/tests/fixtures/update_storage_tier/stac_item_legacy.json new file mode 100644 index 0000000..4a8582c --- /dev/null +++ b/tests/fixtures/update_storage_tier/stac_item_legacy.json @@ -0,0 +1,22 @@ +{ + "stac_version": "1.0.0", + "type": "Feature", + "id": "test-item-legacy", + "geometry": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]}, + "bbox": [0, 0, 1, 1], + "properties": { + "datetime": "2023-12-08T10:00:00Z" + }, + "assets": { + "reflectance": { + "href": "https://s3.de.io.cloud.ovh.net/bucket/data.zarr/measurements/reflectance", + "type": "application/vnd+zarr", + "roles": ["data"] + }, + "thumbnail": { + "href": "https://example.com/thumbnail.png", + "type": "image/png", + "roles": ["thumbnail"] + } + } +} diff --git a/tests/fixtures/update_storage_tier/stac_item_mixed_storage.json b/tests/fixtures/update_storage_tier/stac_item_mixed_storage.json new file mode 100644 index 0000000..a1b50dc --- /dev/null +++ b/tests/fixtures/update_storage_tier/stac_item_mixed_storage.json @@ -0,0 +1,39 @@ +{ + "stac_version": "1.0.0", + "type": "Feature", + "id": "test-item", + "geometry": {"type": "Polygon", "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 1], [0, 0]]]}, + "bbox": [0, 0, 1, 1], + "properties": { + "datetime": "2023-12-08T10:00:00Z" + }, + "assets": { + "reflectance": { + "href": "https://s3.de.io.cloud.ovh.net/bucket/data.zarr/measurements/reflectance", + "type": "application/vnd+zarr", + "roles": ["data"], + "alternate": { + "s3": { + "href": "s3://bucket/data.zarr/measurements/reflectance", + "storage:platform": "OVHcloud", + "storage:region": "de", + "storage:requester_pays": false, + "ovh:storage_tier": "MIXED", + "ovh:storage_tier_distribution": { + "STANDARD": 450, + "GLACIER": 608 + } + } + } + }, + "thumbnail": { + "href": "https://example.com/thumbnail.png", + "type": "image/png", + "roles": ["thumbnail"] + } + }, + "stac_extensions": [ + "https://stac-extensions.github.io/alternate-assets/v1.2.0/schema.json", + "https://stac-extensions.github.io/storage/v2.0.0/schema.json" + ] +} diff --git a/tests/unit/test_storage_tier_utils.py b/tests/unit/test_storage_tier_utils.py new file mode 100644 index 0000000..047f5d3 --- /dev/null +++ b/tests/unit/test_storage_tier_utils.py @@ -0,0 +1,448 @@ +"""Unit tests for storage_tier_utils module.""" + +import sys +from pathlib import Path +from unittest.mock import Mock, patch + +from botocore.exceptions import ClientError + +# Add scripts directory to path +scripts_dir = Path(__file__).parent.parent.parent / "scripts" +sys.path.insert(0, str(scripts_dir)) + +from storage_tier_utils import ( # noqa: E402 + extract_region_from_endpoint, + get_s3_storage_class, + get_s3_storage_info, +) + + +class TestGetS3StorageClass: + """Tests for get_s3_storage_class function.""" + + @patch("storage_tier_utils.boto3") + def test_single_file_standard_tier(self, mock_boto3): + """Test querying a single file with STANDARD tier (no StorageClass field).""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {} # No StorageClass = STANDARD + + # Test + result = get_s3_storage_class("s3://bucket/path/file.txt", "https://s3.endpoint.com") + + # Verify + assert result == "STANDARD" + mock_client.head_object.assert_called_once_with(Bucket="bucket", Key="path/file.txt") + + @patch("storage_tier_utils.boto3") + def test_single_file_standard_ia_tier(self, mock_boto3): + """Test querying a single file with STANDARD_IA tier.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {"StorageClass": "STANDARD_IA"} + + # Test + result = get_s3_storage_class("s3://bucket/path/file.txt", "https://s3.endpoint.com") + + # Verify + assert result == "STANDARD_IA" + + @patch("storage_tier_utils.boto3") + def test_single_file_express_onezone(self, mock_boto3): + """Test querying a single file with EXPRESS_ONEZONE tier.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {"StorageClass": "EXPRESS_ONEZONE"} + + # Test + result = get_s3_storage_class("s3://bucket/path/file.txt", "https://s3.endpoint.com") + + # Verify + assert result == "EXPRESS_ONEZONE" + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_all_same_class(self, mock_boto3): + """Test Zarr directory where all files have same storage class.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 (it's a directory) + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns files with STANDARD_IA class + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "data.zarr/0.0", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.1", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/.zarray", "StorageClass": "STANDARD_IA"}, + ] + } + + # Test + result = get_s3_storage_class("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result == "STANDARD_IA" + mock_client.list_objects_v2.assert_called_once() + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_mixed_storage(self, mock_boto3, caplog): + """Test Zarr directory with mixed storage classes - returns most common.""" + + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns mixed storage classes (3 STANDARD_IA, 2 STANDARD) + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "data.zarr/0.0", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.1", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.2", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/.zarray"}, # STANDARD (no StorageClass) + {"Key": "data.zarr/.zattrs"}, # STANDARD + ] + } + + # Test + result = get_s3_storage_class("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify - should return most common (STANDARD_IA: 3 vs STANDARD: 2) + assert result == "STANDARD_IA" + assert "Mixed storage classes detected" in caplog.text + assert "STANDARD_IA: 3/5" in caplog.text + assert "STANDARD: 2/5" in caplog.text + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_empty(self, mock_boto3): + """Test Zarr directory with no files.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns empty + mock_client.list_objects_v2.return_value = {} + + # Test + result = get_s3_storage_class("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result is None + + @patch("storage_tier_utils.boto3") + def test_non_s3_url(self, mock_boto3): + """Test with non-S3 URL.""" + result = get_s3_storage_class("https://example.com/file", "https://s3.endpoint.com") + assert result is None + + @patch("storage_tier_utils.boto3") + def test_s3_url_no_key(self, mock_boto3): + """Test with S3 URL but no key (root bucket).""" + result = get_s3_storage_class("s3://bucket/", "https://s3.endpoint.com") + assert result is None + + @patch("storage_tier_utils.boto3") + def test_permission_error(self, mock_boto3): + """Test handling of permission errors.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "403"}}, "HeadObject") + + # Test + result = get_s3_storage_class("s3://bucket/file.txt", "https://s3.endpoint.com") + + # Verify + assert result is None + + @patch("storage_tier_utils.boto3") + def test_endpoint_parameter_used(self, mock_boto3): + """Test that s3_endpoint parameter is used for boto3 client.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {"StorageClass": "STANDARD"} + + # Test with specific endpoint + endpoint = "https://custom.s3.endpoint.com" + get_s3_storage_class("s3://bucket/file.txt", endpoint) + + # Verify endpoint was used + mock_boto3.client.assert_called_once_with("s3", endpoint_url=endpoint) + + @patch("storage_tier_utils.boto3") + @patch("storage_tier_utils.os.getenv") + def test_fallback_to_env_endpoint(self, mock_getenv, mock_boto3): + """Test fallback to AWS_ENDPOINT_URL environment variable.""" + # Setup mocks + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {} + mock_getenv.return_value = "https://env.endpoint.com" + + # Test with no endpoint parameter + get_s3_storage_class("s3://bucket/file.txt", "") + + # Verify env endpoint was used + mock_boto3.client.assert_called_once_with("s3", endpoint_url="https://env.endpoint.com") + + +class TestExtractRegionFromEndpoint: + """Tests for extract_region_from_endpoint function.""" + + def test_de_region(self): + """Test extracting DE region.""" + assert extract_region_from_endpoint("https://s3.de.io.cloud.ovh.net") == "de" + + def test_gra_region(self): + """Test extracting GRA region.""" + assert extract_region_from_endpoint("https://s3.gra.io.cloud.ovh.net") == "gra" + + def test_sbg_region(self): + """Test extracting SBG region.""" + assert extract_region_from_endpoint("https://s3.sbg.io.cloud.ovh.net") == "sbg" + + def test_uk_region(self): + """Test extracting UK region.""" + assert extract_region_from_endpoint("https://s3.uk.io.cloud.ovh.net") == "uk" + + def test_ca_region(self): + """Test extracting CA region.""" + assert extract_region_from_endpoint("https://s3.ca.io.cloud.ovh.net") == "ca" + + def test_unknown_region(self): + """Test unknown region returns 'unknown'.""" + assert extract_region_from_endpoint("https://s3.unknown-region.com") == "unknown" + + def test_endpoint_without_protocol(self): + """Test endpoint without https:// protocol.""" + assert extract_region_from_endpoint("s3.de.io.cloud.ovh.net") == "de" + + def test_empty_endpoint(self): + """Test empty endpoint.""" + assert extract_region_from_endpoint("") == "unknown" + + +class TestGetS3StorageInfo: + """Tests for get_s3_storage_info function.""" + + @patch("storage_tier_utils.boto3") + def test_single_file_standard_tier(self, mock_boto3): + """Test querying a single file with STANDARD tier.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {} # No StorageClass = STANDARD + + # Test + result = get_s3_storage_info("s3://bucket/path/file.txt", "https://s3.endpoint.com") + + # Verify + assert result == {"tier": "STANDARD", "distribution": None} + mock_client.head_object.assert_called_once_with(Bucket="bucket", Key="path/file.txt") + + @patch("storage_tier_utils.boto3") + def test_single_file_standard_ia_tier(self, mock_boto3): + """Test querying a single file with STANDARD_IA tier.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.return_value = {"StorageClass": "STANDARD_IA"} + + # Test + result = get_s3_storage_info("s3://bucket/path/file.txt", "https://s3.endpoint.com") + + # Verify + assert result == {"tier": "STANDARD_IA", "distribution": None} + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_uniform_standard_ia(self, mock_boto3): + """Test Zarr directory where all files have same storage class.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 (it's a directory) + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns files with STANDARD_IA class + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "data.zarr/0.0", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.1", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/.zarray", "StorageClass": "STANDARD_IA"}, + ] + } + + # Test + result = get_s3_storage_info("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result == {"tier": "STANDARD_IA", "distribution": {"STANDARD_IA": 3}} + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_uniform_standard(self, mock_boto3): + """Test Zarr directory where all files are STANDARD.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns files without StorageClass field + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "data.zarr/0.0"}, # No StorageClass = STANDARD + {"Key": "data.zarr/0.1"}, + {"Key": "data.zarr/.zarray"}, + ] + } + + # Test + result = get_s3_storage_info("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result == {"tier": "STANDARD", "distribution": {"STANDARD": 3}} + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_mixed_storage(self, mock_boto3, caplog): + """Test Zarr directory with mixed storage classes - should return MIXED.""" + import logging + + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns mixed storage classes + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": "data.zarr/0.0", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.1", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/0.2", "StorageClass": "STANDARD_IA"}, + {"Key": "data.zarr/.zarray"}, # STANDARD (no StorageClass) + {"Key": "data.zarr/.zattrs"}, # STANDARD + ] + } + + # Set log level to capture INFO messages + caplog.set_level(logging.INFO, logger="storage_tier_utils") + + # Test + result = get_s3_storage_info("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result["tier"] == "MIXED" + assert result["distribution"] == {"STANDARD_IA": 3, "STANDARD": 2} + assert "Mixed storage classes detected" in caplog.text + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_large_mixed_storage(self, mock_boto3): + """Test Zarr directory with many files and mixed storage.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # Simulate 100 files: 60 STANDARD_IA, 40 STANDARD + contents = [] + for i in range(60): + contents.append({"Key": f"data.zarr/chunk_{i}", "StorageClass": "STANDARD_IA"}) + for i in range(40): + contents.append({"Key": f"data.zarr/meta_{i}"}) # STANDARD (no StorageClass) + + mock_client.list_objects_v2.return_value = {"Contents": contents} + + # Test + result = get_s3_storage_info("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result["tier"] == "MIXED" + assert result["distribution"] == {"STANDARD_IA": 60, "STANDARD": 40} + + @patch("storage_tier_utils.boto3") + def test_zarr_directory_empty(self, mock_boto3): + """Test Zarr directory with no files.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + # list_objects_v2 returns empty + mock_client.list_objects_v2.return_value = {} + + # Test + result = get_s3_storage_info("s3://bucket/data.zarr", "https://s3.endpoint.com") + + # Verify + assert result is None + + @patch("storage_tier_utils.boto3") + def test_max_samples_parameter(self, mock_boto3): + """Test that max_samples parameter is respected.""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + + # First head_object fails with 404 + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "404"}}, "HeadObject") + + mock_client.list_objects_v2.return_value = { + "Contents": [ + {"Key": f"data.zarr/{i}", "StorageClass": "STANDARD_IA"} for i in range(50) + ] + } + + # Test with custom max_samples + result = get_s3_storage_info( + "s3://bucket/data.zarr", "https://s3.endpoint.com", max_samples=50 + ) + + # Verify MaxKeys parameter was passed + mock_client.list_objects_v2.assert_called_once_with( + Bucket="bucket", Prefix="data.zarr/", MaxKeys=50 + ) + assert result == {"tier": "STANDARD_IA", "distribution": {"STANDARD_IA": 50}} + + @patch("storage_tier_utils.boto3") + def test_invalid_url(self, mock_boto3): + """Test with non-S3 URL.""" + result = get_s3_storage_info("https://example.com/file", "https://s3.endpoint.com") + assert result is None + + @patch("storage_tier_utils.boto3") + def test_no_key_in_url(self, mock_boto3): + """Test with S3 URL containing only bucket.""" + result = get_s3_storage_info("s3://bucket/", "https://s3.endpoint.com") + assert result is None + + @patch("storage_tier_utils.boto3") + def test_permission_error(self, mock_boto3): + """Test when access is denied (403 error).""" + # Setup mock + mock_client = Mock() + mock_boto3.client.return_value = mock_client + mock_client.head_object.side_effect = ClientError({"Error": {"Code": "403"}}, "HeadObject") + + # Test + result = get_s3_storage_info("s3://bucket/file.txt", "https://s3.endpoint.com") + + # Verify + assert result is None diff --git a/tests/unit/test_update_stac_storage_tier.py b/tests/unit/test_update_stac_storage_tier.py new file mode 100644 index 0000000..acdf034 --- /dev/null +++ b/tests/unit/test_update_stac_storage_tier.py @@ -0,0 +1,385 @@ +"""Unit tests for update_stac_storage_tier.py script.""" + +import json +import sys +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest + +# Add scripts directory to path +scripts_dir = Path(__file__).parent.parent.parent / "scripts" +sys.path.insert(0, str(scripts_dir)) + +from pystac import Item # noqa: E402 + +# Fixtures directory +FIXTURES_DIR = Path(__file__).parent.parent / "fixtures" / "update_storage_tier" + + +@pytest.fixture +def stac_item_before(): + """STAC item with existing alternate.s3 (STANDARD tier).""" + with open(FIXTURES_DIR / "stac_item_before.json") as f: + return Item.from_dict(json.load(f)) + + +@pytest.fixture +def stac_item_legacy(): + """Legacy STAC item without alternate.s3.""" + with open(FIXTURES_DIR / "stac_item_legacy.json") as f: + return Item.from_dict(json.load(f)) + + +@pytest.fixture +def s3_responses(): + """Mock S3 storage tier responses.""" + with open(FIXTURES_DIR / "s3_storage_responses.json") as f: + return json.load(f) + + +class TestUpdateItemStorageTiers: + """Tests for update_item_storage_tiers function.""" + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_tier_change_updates_asset(self, mock_get_info, stac_item_before): + """Test updating storage tier when it changes.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = {"tier": "GLACIER", "distribution": None} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + assert with_alt == 1 + assert with_tier == 1 + assert ( + stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"][ + "ovh:storage_tier" + ] + == "GLACIER" + ) + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_no_update_when_tier_unchanged(self, mock_get_info, stac_item_before): + """Test no update when tier hasn't changed.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = {"tier": "STANDARD", "distribution": None} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 0 + assert with_tier == 1 + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_removes_tier_on_s3_query_failure(self, mock_get_info, stac_item_before): + """Test tier removal when S3 query fails.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = None + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + assert failed == 1 + assert ( + "ovh:storage_tier" + not in stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"] + ) + + @patch("update_stac_storage_tier.get_s3_storage_info") + @patch("update_stac_storage_tier.https_to_s3") + def test_add_missing_creates_alternate_s3( + self, mock_https_to_s3, mock_get_info, stac_item_legacy + ): + """Test add_missing mode creates alternate.s3 structure.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_https_to_s3.return_value = "s3://bucket/data.zarr/measurements/reflectance" + mock_get_info.return_value = {"tier": "STANDARD", "distribution": None} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_legacy, "https://s3.de.io.cloud.ovh.net", add_missing=True + ) + + assert updated == 1 + assert added == 1 + s3_info = stac_item_legacy.assets["reflectance"].extra_fields["alternate"]["s3"] + assert s3_info["href"] == "s3://bucket/data.zarr/measurements/reflectance" + assert s3_info["ovh:storage_tier"] == "STANDARD" + assert s3_info["storage:platform"] == "OVHcloud" + + @patch("update_stac_storage_tier.get_s3_storage_info") + @patch("update_stac_storage_tier.https_to_s3") + def test_add_missing_skips_on_s3_failure( + self, mock_https_to_s3, mock_get_info, stac_item_legacy + ): + """Test add_missing mode skips asset when S3 query fails.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_https_to_s3.return_value = "s3://bucket/data.zarr/measurements/reflectance" + mock_get_info.return_value = None + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_legacy, "https://s3.endpoint.com", add_missing=True + ) + + assert updated == 0 + assert skipped == 1 + assert failed == 1 + assert "alternate" not in stac_item_legacy.assets["reflectance"].extra_fields + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_mixed_storage_adds_distribution(self, mock_get_info, stac_item_before): + """Test mixed storage adds distribution metadata.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = { + "tier": "MIXED", + "distribution": {"STANDARD": 450, "GLACIER": 608}, + } + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + s3_info = stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"] + assert s3_info["ovh:storage_tier"] == "MIXED" + assert s3_info["ovh:storage_tier_distribution"] == {"STANDARD": 450, "GLACIER": 608} + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_uniform_zarr_adds_distribution(self, mock_get_info, stac_item_before): + """Test uniform Zarr includes distribution.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = {"tier": "GLACIER", "distribution": {"GLACIER": 100}} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + s3_info = stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"] + assert s3_info["ovh:storage_tier"] == "GLACIER" + assert s3_info["ovh:storage_tier_distribution"] == {"GLACIER": 100} + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_single_file_no_distribution(self, mock_get_info, stac_item_before): + """Test single file doesn't add distribution.""" + from update_stac_storage_tier import update_item_storage_tiers + + mock_get_info.return_value = {"tier": "GLACIER", "distribution": None} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + s3_info = stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"] + assert s3_info["ovh:storage_tier"] == "GLACIER" + assert "ovh:storage_tier_distribution" not in s3_info + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_removes_distribution_when_becomes_single_file(self, mock_get_info, stac_item_before): + """Test distribution removal when Zarr becomes single file.""" + from update_stac_storage_tier import update_item_storage_tiers + + # Add existing distribution + stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"][ + "ovh:storage_tier" + ] = "MIXED" + stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"][ + "ovh:storage_tier_distribution" + ] = {"STANDARD": 450, "GLACIER": 608} + + mock_get_info.return_value = {"tier": "GLACIER", "distribution": None} + + updated, with_alt, with_tier, added, skipped, failed = update_item_storage_tiers( + stac_item_before, "https://s3.endpoint.com", add_missing=False + ) + + assert updated == 1 + s3_info = stac_item_before.assets["reflectance"].extra_fields["alternate"]["s3"] + assert s3_info["ovh:storage_tier"] == "GLACIER" + assert "ovh:storage_tier_distribution" not in s3_info + + @patch("update_stac_storage_tier.get_s3_storage_info") + def test_skips_thumbnail_assets(self, mock_get_info, stac_item_before): + """Test thumbnail assets are skipped.""" + from update_stac_storage_tier import update_item_storage_tiers + + update_item_storage_tiers(stac_item_before, "https://s3.endpoint.com", add_missing=False) + + # Should only be called once (for reflectance, not thumbnail) + assert mock_get_info.call_count == 1 + + +class TestUpdateStacItem: + """Tests for update_stac_item function.""" + + @patch("update_stac_storage_tier.httpx.Client") + @patch("update_stac_storage_tier.update_item_storage_tiers") + def test_dry_run_skips_stac_update(self, mock_update_tiers, mock_httpx, stac_item_before): + """Test dry run doesn't update STAC API.""" + from update_stac_storage_tier import update_stac_item + + mock_response = Mock() + mock_response.json.return_value = stac_item_before.to_dict() + mock_http_client = Mock() + mock_http_client.get.return_value = mock_response + mock_http_client.__enter__ = Mock(return_value=mock_http_client) + mock_http_client.__exit__ = Mock(return_value=False) + mock_httpx.return_value = mock_http_client + + mock_update_tiers.return_value = (1, 1, 1, 0, 0, 0) + + result = update_stac_item( + "https://stac.api.com/collections/test/items/test-item", + "https://stac.api.com", + "https://s3.endpoint.com", + dry_run=True, + ) + + assert result["updated"] == 1 + + @patch("update_stac_storage_tier.httpx.Client") + @patch("update_stac_storage_tier.update_item_storage_tiers") + @patch("update_stac_storage_tier.Client") + def test_updates_stac_when_changes_made( + self, mock_client_class, mock_update_tiers, mock_httpx, stac_item_before + ): + """Test STAC API is updated when changes are made.""" + from update_stac_storage_tier import update_stac_item + + mock_response = Mock() + mock_response.json.return_value = stac_item_before.to_dict() + mock_http_client = Mock() + mock_http_client.get.return_value = mock_response + mock_http_client.__enter__ = Mock(return_value=mock_http_client) + mock_http_client.__exit__ = Mock(return_value=False) + mock_httpx.return_value = mock_http_client + + mock_update_tiers.return_value = (1, 1, 1, 0, 0, 0) + + mock_stac_client = Mock() + mock_stac_client.self_href = "https://stac.api.com" + mock_session = Mock() + mock_post_response = Mock() + mock_post_response.status_code = 201 + mock_session.post.return_value = mock_post_response + mock_session.delete.return_value = Mock() + mock_stac_client._stac_io.session = mock_session + mock_client_class.open.return_value = mock_stac_client + + result = update_stac_item( + "https://stac.api.com/collections/test/items/test-item", + "https://stac.api.com", + "https://s3.endpoint.com", + dry_run=False, + ) + + assert result["updated"] == 1 + mock_session.delete.assert_called_once() + mock_session.post.assert_called_once() + + @patch("update_stac_storage_tier.httpx.Client") + @patch("update_stac_storage_tier.update_item_storage_tiers") + def test_no_stac_update_when_no_changes(self, mock_update_tiers, mock_httpx, stac_item_before): + """Test STAC API not updated when no changes.""" + from update_stac_storage_tier import update_stac_item + + mock_response = Mock() + mock_response.json.return_value = stac_item_before.to_dict() + mock_http_client = Mock() + mock_http_client.get.return_value = mock_response + mock_http_client.__enter__ = Mock(return_value=mock_http_client) + mock_http_client.__exit__ = Mock(return_value=False) + mock_httpx.return_value = mock_http_client + + mock_update_tiers.return_value = (0, 1, 1, 0, 0, 0) + + result = update_stac_item( + "https://stac.api.com/collections/test/items/test-item", + "https://stac.api.com", + "https://s3.endpoint.com", + dry_run=False, + ) + + assert result["updated"] == 0 + + +class TestMain: + """Tests for main function.""" + + @patch("update_stac_storage_tier.update_stac_item") + def test_main_success(self, mock_update): + """Test main function success.""" + from update_stac_storage_tier import main + + mock_update.return_value = {"updated": 1, "with_tier": 1, "added": 0} + + exit_code = main( + [ + "--stac-item-url", + "https://stac.api.com/collections/col/items/item", + "--stac-api-url", + "https://stac.api.com", + "--s3-endpoint", + "https://s3.endpoint.com", + ] + ) + + assert exit_code == 0 + + @patch("update_stac_storage_tier.update_stac_item") + def test_main_with_flags(self, mock_update): + """Test main with --add-missing and --dry-run flags.""" + from update_stac_storage_tier import main + + mock_update.return_value = {"updated": 1, "with_tier": 1, "added": 1} + + exit_code = main( + [ + "--stac-item-url", + "https://stac.api.com/collections/col/items/item", + "--stac-api-url", + "https://stac.api.com", + "--s3-endpoint", + "https://s3.endpoint.com", + "--add-missing", + "--dry-run", + ] + ) + + assert exit_code == 0 + call_args = mock_update.call_args[0] + assert call_args[3] is True # dry_run + assert call_args[4] is True # add_missing + + @patch("update_stac_storage_tier.update_stac_item") + def test_main_handles_exception(self, mock_update): + """Test main handles exceptions.""" + from update_stac_storage_tier import main + + mock_update.side_effect = Exception("Test error") + + exit_code = main( + [ + "--stac-item-url", + "https://stac.api.com/collections/col/items/item", + "--stac-api-url", + "https://stac.api.com", + "--s3-endpoint", + "https://s3.endpoint.com", + ] + ) + + assert exit_code == 1