Skip to content

Commit 59f4151

Browse files
authored
feat: add query stac for automate cron workflow (#57)
1 parent 8d402b5 commit 59f4151

File tree

6 files changed

+1959
-1532
lines changed

6 files changed

+1959
-1532
lines changed

scripts/query_stac.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Query STAC API for new items to process.
4+
5+
This script searches for items in a source collection within a specified time window
6+
and checks if they already exist in the target collection to avoid reprocessing.
7+
"""
8+
9+
import json
10+
import logging
11+
import os
12+
import sys
13+
from datetime import UTC, datetime, timedelta
14+
15+
from pystac_client import Client
16+
17+
# Configure logging
18+
logging.basicConfig(
19+
level=os.getenv("LOG_LEVEL", "INFO"),
20+
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
21+
)
22+
logger = logging.getLogger(__name__)
23+
24+
25+
def main() -> None:
26+
"""Main entry point for STAC query script."""
27+
# Configuration from Argo workflow parameters
28+
STAC_API_URL = sys.argv[1]
29+
SOURCE_COLLECTION = sys.argv[2]
30+
TARGET_COLLECTION = sys.argv[3]
31+
END_TIME_OFFSET_HOURS = int(sys.argv[4])
32+
LOOKBACK_HOURS = int(sys.argv[5])
33+
AOI_BBOX = json.loads(sys.argv[6])
34+
35+
# Calculate time window
36+
end_time = datetime.now(UTC) - timedelta(hours=END_TIME_OFFSET_HOURS)
37+
start_time = end_time - timedelta(hours=LOOKBACK_HOURS)
38+
39+
# Format datetime for STAC API (replace +00:00 with Z)
40+
start_time_str = start_time.isoformat().replace("+00:00", "Z")
41+
end_time_str = end_time.isoformat().replace("+00:00", "Z")
42+
43+
logger.info(f"Querying STAC API: {STAC_API_URL}")
44+
logger.info(f"Collection: {SOURCE_COLLECTION}")
45+
logger.info(f"Time range: {start_time_str} to {end_time_str}")
46+
47+
# Connect to STAC catalog
48+
catalog = Client.open(STAC_API_URL)
49+
50+
# Search for items
51+
search = catalog.search(
52+
collections=[SOURCE_COLLECTION],
53+
datetime=f"{start_time_str}/{end_time_str}",
54+
bbox=AOI_BBOX,
55+
)
56+
57+
# Collect items to process
58+
items_to_process = []
59+
checked_count = 0
60+
61+
for page in search.pages():
62+
for item in page.items:
63+
checked_count += 1
64+
65+
# Get item URL
66+
item_url = next(
67+
(link.href for link in item.links if link.rel == "self"),
68+
None,
69+
)
70+
71+
if not item_url:
72+
logger.warning(f"Skipping {item.id}: No self link")
73+
continue
74+
75+
# Check if already converted (prevent wasteful reprocessing)
76+
try:
77+
target_search = catalog.search(
78+
collections=[TARGET_COLLECTION],
79+
ids=[item.id],
80+
)
81+
existing_items = list(target_search.items())
82+
83+
if existing_items:
84+
logger.info(f"Skipping {item.id}: Already converted")
85+
continue
86+
except Exception as e:
87+
logger.warning(f"Could not check {item.id}: {e}")
88+
# On error, process it to be safe
89+
90+
# Add to processing queue
91+
items_to_process.append(
92+
{
93+
"source_url": item_url,
94+
"collection": TARGET_COLLECTION,
95+
"item_id": item.id,
96+
}
97+
)
98+
99+
logger.info(f"📊 Summary: Checked {checked_count} items, {len(items_to_process)} to process")
100+
101+
# Output ONLY JSON to stdout (for Argo withParam)
102+
sys.stdout.write(json.dumps(items_to_process))
103+
sys.stdout.flush()
104+
105+
106+
if __name__ == "__main__":
107+
main()

tests/fixtures/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Test Fixtures
2+
3+
Minimal JSON fixtures for testing the STAC query script.
4+
5+
## Files
6+
7+
### `stac_source_collection.json`
8+
Source collection with 3 items:
9+
- `item-001`: New item to process
10+
- `item-002`: Already exists in target (should be excluded)
11+
- `item-003`: New item to process
12+
13+
### `stac_target_collection.json`
14+
Target collection with already converted items:
15+
- `item-002`: Already processed
16+
17+
## Usage
18+
19+
These fixtures are loaded by `load_fixtures()` in the test file and converted to STAC items using `create_stac_item()`. Tests that need special scenarios (like items without self links or error conditions) create items directly in the test using `create_stac_item()` for flexibility.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{
2+
"collection_id": "sentinel-2-l2a",
3+
"items": [
4+
{
5+
"id": "item-001",
6+
"datetime": "2023-12-08T10:00:00Z",
7+
"bbox": [2.5, 48.5, 3.5, 49.5]
8+
},
9+
{
10+
"id": "item-002",
11+
"datetime": "2023-12-08T11:00:00Z",
12+
"bbox": [3.0, 49.0, 4.0, 50.0]
13+
},
14+
{
15+
"id": "item-003",
16+
"datetime": "2023-12-08T12:00:00Z",
17+
"bbox": [2.0, 48.0, 3.0, 49.0]
18+
}
19+
]
20+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
{
2+
"collection_id": "sentinel-2-l2a-staging",
3+
"items": [
4+
{
5+
"id": "item-002",
6+
"datetime": "2023-12-08T11:00:00Z",
7+
"bbox": [3.0, 49.0, 4.0, 50.0]
8+
}
9+
]
10+
}

0 commit comments

Comments
 (0)