Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
export COLLECTION=conservation-area
export DATASET=conservation-area
export DATASET_NAME=conservation-area
export TRANSFORMATION_OFFSET=1600
export TRANSFORMATION_OFFSET=1200
export TRANSFORMATION_LIMIT=200
# export REPROCESS=''
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
/expectations
/task/metadata.json
/metadata.json
/task/state.json
/state.json
/task/specification
/specification
/task/issue
Expand Down
12 changes: 10 additions & 2 deletions bin/download_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
logger = logging.getLogger(__name__)


def download_resources(collection, collection_dir: str, bucket=None, base_url=None, collection_name=None, dataset=None, transformaiton_offset=None, transformation_limit=None, dataset_resource_dir="var/dataset-resource/", pipeline_dir="pipeline/", specification_dir="specification/", reprocess=False, max_threads=4) -> None:
def download_resources(collection, collection_dir: str, bucket=None, base_url=None, collection_name=None, dataset=None, transformaiton_offset=None, transformation_limit=None, dataset_resource_dir="var/dataset-resource/", pipeline_dir="pipeline/", specification_dir="specification/", reprocess=False, max_threads=4, state_path=None) -> None:
"""Download resources for a collection.

Uses the same selection logic as transform_resources (via select_resources_to_process)
Expand Down Expand Up @@ -59,6 +59,7 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No
offset=transformaiton_offset,
limit=transformation_limit,
reprocess=reprocess,
state_path=state_path,
)

# Extract unique resources to download (a resource only needs to be downloaded once)
Expand Down Expand Up @@ -154,6 +155,12 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No
default="specification/",
help="Path to the specification directory (used for specification hash)"
)
@click.option(
"--state-path",
default=None,
type=click.Path(exists=True),
help="Path to state.json for stable ordered resource list"
)
@click.option(
"--reprocess",
is_flag=True,
Expand All @@ -176,7 +183,7 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No
is_flag=True,
help="Enable debug logging"
)
def run_command(collection_dir, bucket, base_url, collection_name, dataset, offset, limit, dataset_resource_dir, pipeline_dir, specification_dir, reprocess, max_threads, quiet, debug):
def run_command(collection_dir, bucket, base_url, collection_name, dataset, offset, limit, dataset_resource_dir, pipeline_dir, specification_dir, state_path, reprocess, max_threads, quiet, debug):
"""Download resources for a collection from S3 or HTTP(S) URLs.

Either --bucket or --base-url must be provided.
Expand Down Expand Up @@ -218,6 +225,7 @@ def run_command(collection_dir, bucket, base_url, collection_name, dataset, offs
specification_dir=specification_dir,
reprocess=reprocess,
max_threads=max_threads,
state_path=state_path,
)
click.echo("Download complete!")
except ValueError as e:
Expand Down
34 changes: 29 additions & 5 deletions bin/transform.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,34 @@ make init
echo "Step 3: Building collection database..."
make collection

# Step 4: Download dataset resource logs (used to skip already up-to-date resources)
# Step 4: Download state.json (used for stable batch ordering) and dataset resource logs
# Use local state.json if it exists, allowing manual edits
STATE_PATH=${STATE_PATH:-"state.json"}

if [ -f "$STATE_PATH" ]; then
echo "Step 4a: Using existing state.json at $STATE_PATH"
else
echo "Step 4a: Downloading state.json..."
if [ -n "$COLLECTION_DATASET_BUCKET_NAME" ]; then
aws s3 cp "s3://${COLLECTION_DATASET_BUCKET_NAME}/${COLLECTION_NAME}-collection/state.json" "$STATE_PATH"
elif [ -n "$DATASTORE_URL" ]; then
base=$(echo "$DATASTORE_URL" | sed 's:/*$::')
curl --fail -o "$STATE_PATH" "${base}/${COLLECTION_NAME}-collection/state.json"
else
echo "Error: Either COLLECTION_DATASET_BUCKET_NAME or DATASTORE_URL must be set"
exit 1
fi

if [ ! -f "$STATE_PATH" ]; then
echo "Error: Failed to download state.json"
exit 1
fi
fi

# Download dataset resource logs (used to skip already up-to-date resources within a batch)
# Skipped when REPROCESS is set - logs will be freshly written by the transform step
if [ -z "$REPROCESS" ]; then
echo "Step 4: Downloading dataset resource logs..."
echo "Step 4b: Downloading dataset resource logs..."

DATASET_RESOURCE_CMD="python bin/download_dataset_resource.py --collection-dir $COLLECTION_DIR --collection-name $COLLECTION_NAME --dataset-resource-dir $DATASET_RESOURCE_DIR"

Expand All @@ -61,14 +85,14 @@ if [ -z "$REPROCESS" ]; then
echo "Command: $DATASET_RESOURCE_CMD"
eval $DATASET_RESOURCE_CMD
else
echo "Step 4: Skipping dataset resource log download - REPROCESS is set, logs will be written fresh"
echo "Step 4b: Skipping dataset resource log download - REPROCESS is set, logs will be written fresh"
fi

# Step 5: Download resources
echo "Step 5: Downloading resources..."

# Build the download command
DOWNLOAD_CMD="python bin/download_resources.py --collection-dir $COLLECTION_DIR"
DOWNLOAD_CMD="python bin/download_resources.py --collection-dir $COLLECTION_DIR --state-path $STATE_PATH"

# Add bucket or base URL (bucket takes precedence, matching makefile convention)
if [ -n "$COLLECTION_DATASET_BUCKET_NAME" ]; then
Expand Down Expand Up @@ -111,7 +135,7 @@ echo "Resources downloaded successfully"
# Step 6: Transform resources using Python multiprocessing
echo "Step 6: Transforming resources..."

TRANSFORM_CMD="python bin/transform_resources.py --collection-dir $COLLECTION_DIR"
TRANSFORM_CMD="python bin/transform_resources.py --collection-dir $COLLECTION_DIR --state-path $STATE_PATH"

# Add directory parameters
TRANSFORM_CMD="$TRANSFORM_CMD --pipeline-dir $PIPELINE_DIR"
Expand Down
8 changes: 8 additions & 0 deletions bin/transform_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
default="specification/",
help="Path to the specification directory"
)
@click.option(
"--state-path",
default=None,
type=click.Path(exists=True),
help="Path to state.json for stable ordered resource list"
)
@click.option(
"--reprocess",
is_flag=True,
Expand Down Expand Up @@ -119,6 +125,7 @@ def run_command(
offset,
limit,
max_workers,
state_path,
reprocess,
quiet,
debug
Expand Down Expand Up @@ -148,6 +155,7 @@ def run_command(
offset=offset,
limit=limit,
max_workers=max_workers,
state_path=state_path,
reprocess=reprocess,
)

Expand Down
55 changes: 46 additions & 9 deletions src/collection_task/filtering.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Filtering and resource management functions for collection tasks."""

import json
import logging
from typing import Dict, List, Optional, Tuple

Expand Down Expand Up @@ -89,6 +90,31 @@ def apply_offset_and_limit(
return dataset_resource_pairs


def load_state_resources(state_path: str, dataset: str) -> List[Tuple[str, str]]:
"""Load the ordered resource list for a dataset from a state.json file.

Args:
state_path: Path to state.json
dataset: Dataset name to load resources for

Returns:
List of (dataset, resource) tuples in the order defined by state.json

Raises:
FileNotFoundError: If state_path does not exist
KeyError: If state.json does not contain 'transform_resources' or dataset is not found
"""
with open(state_path) as f:
state = json.load(f)

transform_resources = state["transform_resources"]

if dataset not in transform_resources:
raise KeyError(f"Dataset '{dataset}' not found in state.json transform_resources")

return [(dataset, resource) for resource in transform_resources[dataset]]


def select_resources_to_process(
dataset_resource_map: Dict[str, List[str]],
dataset_resource_dir: str,
Expand All @@ -98,31 +124,42 @@ def select_resources_to_process(
offset: Optional[int] = None,
limit: Optional[int] = None,
reprocess: bool = False,
state_path: Optional[str] = None,
) -> List[Tuple[str, str]]:
"""Select the (dataset, resource) pairs to process or download.

Applies the same logic in both download_resources and transform_resources
so the two steps always operate on the same set of resources:
If state_path is provided, the stable ordered list from state.json is used
as the base for offset/limit. Otherwise falls back to building from
dataset_resource_map. The skip check (resource_needs_processing) is always
applied after offset/limit so batch boundaries remain stable.

1. Build the full list of (dataset, resource) pairs
2. If not reprocess: filter to only those whose dataset-resource log is
out-of-date (different code version, config hash, or spec hash)
3. Apply offset/limit to the resulting list
1. Build the ordered list (from state.json if available, else dataset_resource_map)
2. Apply offset/limit to get a stable batch slice
3. If not reprocess: filter the slice to only those needing processing

Args:
dataset_resource_map: Dictionary mapping datasets to lists of resources
dataset_resource_dir: Path to dataset resource logs
pipeline_dir: Path to pipeline config directory (used for config hash)
specification_dir: Path to specification directory (used for spec hash)
dataset: Optional dataset name to filter to
offset: Optional offset into the final list
offset: Optional offset into the stable ordered list
limit: Optional maximum number of pairs to return
reprocess: If True, skip the dataset-resource log check
state_path: Optional path to state.json for stable ordered resource list

Returns:
List of (dataset, resource) tuples to process
"""
pairs = build_dataset_resource_pairs(dataset_resource_map, dataset=dataset)
if state_path:
if not dataset:
raise ValueError("dataset is required when state_path is provided")
pairs = load_state_resources(state_path, dataset=dataset)
else:
pairs = build_dataset_resource_pairs(dataset_resource_map, dataset=dataset)

# Apply offset/limit to stable list first so batch boundaries never shift
pairs = apply_offset_and_limit(pairs, offset=offset, limit=limit, dataset=dataset)

if not reprocess:
config_hash = hash_directory(pipeline_dir)
Expand All @@ -141,7 +178,7 @@ def select_resources_to_process(
f"{len(pairs)} to process"
)

return apply_offset_and_limit(pairs, offset=offset, limit=limit, dataset=dataset)
return pairs


def build_retired_resources_set(old_resource_entries: List[Dict]) -> set:
Expand Down
2 changes: 2 additions & 0 deletions src/collection_task/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def process_resources(
limit=None,
max_workers=None,
reprocess=False,
state_path=None,
):
"""Process resources using multiprocessing.

Expand Down Expand Up @@ -149,6 +150,7 @@ def process_resources(
offset=offset,
limit=limit,
reprocess=reprocess,
state_path=state_path,
)

tasks = []
Expand Down
Loading