Skip to content

Commit 451465f

Browse files
committed
fix: ensure --crs-groups flag set for S2 conversions
The --crs-groups flag triggers prepare_dataset_with_crs_info() in data-model, which writes CRS metadata via ds.rio.write_crs() and creates the spatial_ref coordinate variable required by TiTiler validation. Restores working configuration from commit 21ea009. fix: update sentinel-2 conversion parameters for accuracy feat: add enable-sharding parameter support - Add enable_sharding to mission configs (S2: true, S1: false) - Pass enable_sharding to create_geozarr_dataset kwargs - Add shell/JSON output support for new parameter - Align with data-model launch.json reference config - Remove Dask cluster parameter (use ENABLE_DASK_CLUSTER env var instead) Refs: data-model/.vscode/launch.json L2A config
1 parent 2132d65 commit 451465f

File tree

5 files changed

+119
-82
lines changed

5 files changed

+119
-82
lines changed

scripts/augment_stac_item.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,8 @@ def add_visualization(item: Item, raster_base: str, collection_id: str) -> None:
6666
_add_tile_links(item, base_url, query, "Sentinel-1 GRD VH")
6767

6868
elif coll_lower.startswith(("sentinel-2", "sentinel2")):
69-
# S2: Point to overview level 0 for quicklook TCI
70-
# Use /r10m/0/tci path to access the overview array with spatial_ref
71-
var_path = "/quality/l2a_quicklook/r10m/0/tci"
69+
# S2: Use colon separator for TiTiler variable path
70+
var_path = "/quality/l2a_quicklook/r10m:tci"
7271
query = (
7372
f"variables={urllib.parse.quote(var_path, safe='')}&bidx=1&bidx=2&bidx=3&assets=TCI_10m"
7473
)

scripts/convert.py

Lines changed: 59 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,11 @@ def run_conversion(
7777
# Get conversion parameters from collection config
7878
logger.debug(f"Getting conversion parameters for {collection}...")
7979
params = get_conversion_params(collection)
80-
logger.debug(f" Groups: {params['groups']}")
81-
logger.debug(f" Chunk: {params['spatial_chunk']}")
82-
logger.debug(f" Tile width: {params['tile_width']}")
83-
logger.debug(f" Extra flags: {params['extra_flags']}")
80+
logger.debug(f" Groups: {params['groups']}")
81+
logger.debug(f" Chunk: {params['spatial_chunk']}")
82+
logger.debug(f" Tile width: {params['tile_width']}")
83+
logger.debug(f" Extra flags: {params['extra_flags']}")
84+
logger.debug(f" Enable sharding: {params['enable_sharding']}")
8485

8586
# Construct output path
8687
output_url = f"s3://{s3_output_bucket}/{s3_output_prefix}/{collection}/{item_id}.zarr"
@@ -98,46 +99,60 @@ def run_conversion(
9899
logger.info(f" Source: {zarr_url}")
99100
logger.info(f" Destination: {output_url}")
100101

101-
# Set up Dask cluster for parallel processing
102-
from dask.distributed import Client
103-
104-
with Client() as client:
105-
logger.info(f"🚀 Dask cluster started: {client.dashboard_link}")
106-
107-
# Load source dataset
108-
logger.info("Loading source dataset...")
109-
storage_options = get_storage_options(zarr_url)
110-
dt = xr.open_datatree(
111-
zarr_url,
112-
engine="zarr",
113-
chunks="auto",
114-
storage_options=storage_options,
115-
)
116-
logger.info(f"Loaded DataTree with {len(dt.children)} groups")
117-
118-
# Convert to GeoZarr
119-
logger.info("Converting to GeoZarr format...")
120-
121-
# Parse extra flags for optional parameters
122-
kwargs = {}
123-
if params["extra_flags"] and "--crs-groups" in params["extra_flags"]:
124-
crs_groups_str = params["extra_flags"].split("--crs-groups")[1].strip().split()[0]
125-
kwargs["crs_groups"] = [crs_groups_str]
126-
127-
# groups parameter must be a list
128-
groups_list = [params["groups"]] if isinstance(params["groups"], str) else params["groups"]
129-
130-
create_geozarr_dataset(
131-
dt_input=dt,
132-
groups=groups_list,
133-
output_path=output_url,
134-
spatial_chunk=params["spatial_chunk"],
135-
tile_width=params["tile_width"],
136-
**kwargs,
137-
)
138-
139-
logger.info("✅ Conversion completed successfully!")
140-
logger.info(f"Output: {output_url}")
102+
# Optional: Set up Dask cluster if enabled via environment variable
103+
# Note: eopf-geozarr handles its own Dask setup when using create_geozarr_dataset
104+
# This is here only for future compatibility if we need external cluster management
105+
use_dask = os.getenv("ENABLE_DASK_CLUSTER", "").lower() in ("true", "1", "yes")
106+
if use_dask:
107+
logger.info("🚀 Dask cluster enabled via ENABLE_DASK_CLUSTER env var")
108+
# Future: Could connect to external cluster here if needed
109+
# from dask.distributed import Client
110+
# dask_address = os.getenv("DASK_SCHEDULER_ADDRESS")
111+
# client = Client(dask_address) if dask_address else Client()
112+
113+
# Load source dataset
114+
logger.info("Loading source dataset...")
115+
storage_options = get_storage_options(zarr_url)
116+
dt = xr.open_datatree(
117+
zarr_url,
118+
engine="zarr",
119+
chunks="auto",
120+
storage_options=storage_options,
121+
)
122+
logger.info(f"Loaded DataTree with {len(dt.children)} groups")
123+
124+
# Convert to GeoZarr
125+
logger.info("Converting to GeoZarr format...")
126+
127+
# Parse extra flags for optional parameters
128+
kwargs = {}
129+
if params["extra_flags"] and "--crs-groups" in params["extra_flags"]:
130+
crs_groups_str = params["extra_flags"].split("--crs-groups")[1].strip().split()[0]
131+
kwargs["crs_groups"] = [crs_groups_str]
132+
133+
# Add sharding if enabled
134+
if params.get("enable_sharding", False):
135+
kwargs["enable_sharding"] = True
136+
137+
# groups parameter must be a list
138+
groups_param = params["groups"]
139+
if isinstance(groups_param, str):
140+
groups_list: list[str] = [groups_param]
141+
else:
142+
# groups_param is list[str] in mission configs
143+
groups_list = list(groups_param) if groups_param else []
144+
145+
create_geozarr_dataset(
146+
dt_input=dt,
147+
groups=groups_list,
148+
output_path=output_url,
149+
spatial_chunk=params["spatial_chunk"],
150+
tile_width=params["tile_width"],
151+
**kwargs,
152+
)
153+
154+
logger.info("✅ Conversion completed successfully!")
155+
logger.info(f"Output: {output_url}")
141156

142157
return output_url
143158

scripts/create_geozarr_item.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import argparse
77
import logging
8-
import re
98
from typing import Any
109
from urllib.parse import urlparse
1110

@@ -31,30 +30,6 @@ def s3_to_https(s3_url: str, endpoint: str) -> str:
3130
return f"https://{bucket}.{host}/{path}"
3231

3332

34-
def normalize_r60m_href(href: str) -> str:
35-
"""Add /0/ subdirectory to r10m/r20m/r60m paths to match GeoZarr output structure.
36-
37-
GeoZarr conversion creates /0/ subdirectories (overview level 0) for all
38-
resolution bands. This normalizes asset hrefs accordingly.
39-
40-
Example: .../r10m/tci → .../r10m/0/tci
41-
.../r60m/b09 → .../r60m/0/b09
42-
"""
43-
# Check for any resolution level pattern
44-
for res in ["r10m", "r20m", "r60m"]:
45-
if f"/{res}/" not in href:
46-
continue
47-
48-
# If already has /0/ or other digit subdirectory, don't modify
49-
if re.search(rf"/{res}/\d+/", href):
50-
continue
51-
52-
# Insert /0/ after /{res}/
53-
href = re.sub(rf"/({res})/", r"/\1/0/", href)
54-
55-
return href
56-
57-
5833
def find_source_zarr_base(source_item: dict) -> str | None:
5934
"""Find the base Zarr URL from source item assets."""
6035
for asset in source_item.get("assets", {}).values():
@@ -112,9 +87,6 @@ def create_geozarr_item(
11287
subpath = old_href[len(source_zarr_base) :]
11388
new_href = output_zarr_base + subpath
11489

115-
# Normalize r60m paths to include /0/ subdirectory (GeoZarr structure)
116-
new_href = normalize_r60m_href(new_href)
117-
11890
# Convert to https if needed
11991
if new_href.startswith("s3://"):
12092
new_href = s3_to_https(new_href, s3_endpoint)

scripts/get_conversion_params.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,19 @@
2525
"extra_flags": "--gcp-group /conditions/gcp",
2626
"spatial_chunk": 4096,
2727
"tile_width": 512,
28+
"enable_sharding": False,
2829
},
2930
"sentinel-2": {
30-
"groups": "/quality/l2a_quicklook/r10m",
31-
"extra_flags": "--crs-groups /quality/l2a_quicklook/r10m",
32-
"spatial_chunk": 4096,
33-
"tile_width": 512,
31+
"groups": [
32+
"/measurements/reflectance/r10m",
33+
"/measurements/reflectance/r20m",
34+
"/measurements/reflectance/r60m",
35+
"/quality/l2a_quicklook/r10m",
36+
],
37+
"extra_flags": "--crs-groups /conditions/geometry",
38+
"spatial_chunk": 1024,
39+
"tile_width": 256,
40+
"enable_sharding": True,
3441
},
3542
}
3643

@@ -42,7 +49,7 @@ def get_conversion_params(collection_id: str) -> dict[str, Any]:
4249
collection_id: Collection identifier (e.g., sentinel-1-l1-grd, sentinel-2-l2a-dp-test)
4350
4451
Returns:
45-
Dict of conversion parameters (groups, extra_flags, spatial_chunk, tile_width)
52+
Dict of conversion parameters (groups, extra_flags, spatial_chunk, tile_width, enable_sharding)
4653
"""
4754
# Extract mission prefix (sentinel-1 or sentinel-2)
4855
parts = collection_id.lower().split("-")
@@ -73,7 +80,7 @@ def main(argv: list[str] | None = None) -> int:
7380
)
7481
parser.add_argument(
7582
"--param",
76-
choices=["groups", "extra_flags", "spatial_chunk", "tile_width"],
83+
choices=["groups", "extra_flags", "spatial_chunk", "tile_width", "enable_sharding"],
7784
help="Get single parameter (for shell scripts)",
7885
)
7986

@@ -82,7 +89,12 @@ def main(argv: list[str] | None = None) -> int:
8289

8390
if args.param:
8491
# Output single parameter (for shell variable assignment)
85-
print(params.get(args.param, ""))
92+
value = params.get(args.param, "")
93+
# Convert boolean to shell-friendly format
94+
if isinstance(value, bool):
95+
print("true" if value else "false")
96+
else:
97+
print(value if value is not None else "")
8698
elif args.format == "json":
8799
# Output JSON (for parsing with jq)
88100
print(json.dumps(params, indent=2))
@@ -92,6 +104,7 @@ def main(argv: list[str] | None = None) -> int:
92104
print(f"EXTRA_FLAGS='{params['extra_flags']}'")
93105
print(f"CHUNK={params['spatial_chunk']}")
94106
print(f"TILE_WIDTH={params['tile_width']}")
107+
print(f"ENABLE_SHARDING={'true' if params['enable_sharding'] else 'false'}")
95108

96109
return 0
97110

scripts/get_zarr_url.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python3
2+
"""Extract Zarr URL from STAC item - standalone script for workflow templates."""
3+
4+
import sys
5+
6+
import httpx
7+
8+
9+
def get_zarr_url(stac_item_url: str) -> str:
10+
"""Get Zarr asset URL from STAC item."""
11+
r = httpx.get(stac_item_url, timeout=30.0, follow_redirects=True)
12+
r.raise_for_status()
13+
assets = r.json().get("assets", {})
14+
15+
# Priority: product, zarr, then any .zarr asset
16+
for key in ["product", "zarr"]:
17+
if key in assets and (href := assets[key].get("href")):
18+
return str(href)
19+
20+
# Fallback: any asset with .zarr in href
21+
for asset in assets.values():
22+
if ".zarr" in asset.get("href", ""):
23+
return str(asset["href"])
24+
25+
raise RuntimeError("No Zarr asset found in STAC item")
26+
27+
28+
if __name__ == "__main__":
29+
if len(sys.argv) != 2:
30+
print("Usage: get_zarr_url.py <stac_item_url>", file=sys.stderr)
31+
sys.exit(1)
32+
33+
try:
34+
zarr_url = get_zarr_url(sys.argv[1])
35+
print(zarr_url)
36+
except Exception as e:
37+
print(f"Error: {e}", file=sys.stderr)
38+
sys.exit(1)

0 commit comments

Comments
 (0)