diff --git a/.env.example b/.env.example index e84fb31..65cb8c8 100644 --- a/.env.example +++ b/.env.example @@ -2,5 +2,6 @@ export COLLECTION=conservation-area export DATASET=conservation-area export DATASET_NAME=conservation-area -export TRANSFORMATION_OFFSET=1600 +export TRANSFORMATION_OFFSET=1200 export TRANSFORMATION_LIMIT=200 +# export REPROCESS='' diff --git a/.gitignore b/.gitignore index a960cc6..6a7b085 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,8 @@ /expectations /task/metadata.json /metadata.json +/task/state.json +/state.json /task/specification /specification /task/issue diff --git a/bin/download_resources.py b/bin/download_resources.py index 0f974ba..1ca1322 100644 --- a/bin/download_resources.py +++ b/bin/download_resources.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -def download_resources(collection, collection_dir: str, bucket=None, base_url=None, collection_name=None, dataset=None, transformaiton_offset=None, transformation_limit=None, dataset_resource_dir="var/dataset-resource/", pipeline_dir="pipeline/", specification_dir="specification/", reprocess=False, max_threads=4) -> None: +def download_resources(collection, collection_dir: str, bucket=None, base_url=None, collection_name=None, dataset=None, transformaiton_offset=None, transformation_limit=None, dataset_resource_dir="var/dataset-resource/", pipeline_dir="pipeline/", specification_dir="specification/", reprocess=False, max_threads=4, state_path=None) -> None: """Download resources for a collection. Uses the same selection logic as transform_resources (via select_resources_to_process) @@ -59,6 +59,7 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No offset=transformaiton_offset, limit=transformation_limit, reprocess=reprocess, + state_path=state_path, ) # Extract unique resources to download (a resource only needs to be downloaded once) @@ -154,6 +155,12 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No default="specification/", help="Path to the specification directory (used for specification hash)" ) +@click.option( + "--state-path", + default=None, + type=click.Path(exists=True), + help="Path to state.json for stable ordered resource list" +) @click.option( "--reprocess", is_flag=True, @@ -176,7 +183,7 @@ def download_resources(collection, collection_dir: str, bucket=None, base_url=No is_flag=True, help="Enable debug logging" ) -def run_command(collection_dir, bucket, base_url, collection_name, dataset, offset, limit, dataset_resource_dir, pipeline_dir, specification_dir, reprocess, max_threads, quiet, debug): +def run_command(collection_dir, bucket, base_url, collection_name, dataset, offset, limit, dataset_resource_dir, pipeline_dir, specification_dir, state_path, reprocess, max_threads, quiet, debug): """Download resources for a collection from S3 or HTTP(S) URLs. Either --bucket or --base-url must be provided. @@ -218,6 +225,7 @@ def run_command(collection_dir, bucket, base_url, collection_name, dataset, offs specification_dir=specification_dir, reprocess=reprocess, max_threads=max_threads, + state_path=state_path, ) click.echo("Download complete!") except ValueError as e: diff --git a/bin/transform.sh b/bin/transform.sh index edd766b..6d2fc3c 100755 --- a/bin/transform.sh +++ b/bin/transform.sh @@ -38,10 +38,34 @@ make init echo "Step 3: Building collection database..." make collection -# Step 4: Download dataset resource logs (used to skip already up-to-date resources) +# Step 4: Download state.json (used for stable batch ordering) and dataset resource logs +# Use local state.json if it exists, allowing manual edits +STATE_PATH=${STATE_PATH:-"state.json"} + +if [ -f "$STATE_PATH" ]; then + echo "Step 4a: Using existing state.json at $STATE_PATH" +else + echo "Step 4a: Downloading state.json..." + if [ -n "$COLLECTION_DATASET_BUCKET_NAME" ]; then + aws s3 cp "s3://${COLLECTION_DATASET_BUCKET_NAME}/${COLLECTION_NAME}-collection/state.json" "$STATE_PATH" + elif [ -n "$DATASTORE_URL" ]; then + base=$(echo "$DATASTORE_URL" | sed 's:/*$::') + curl --fail -o "$STATE_PATH" "${base}/${COLLECTION_NAME}-collection/state.json" + else + echo "Error: Either COLLECTION_DATASET_BUCKET_NAME or DATASTORE_URL must be set" + exit 1 + fi + + if [ ! -f "$STATE_PATH" ]; then + echo "Error: Failed to download state.json" + exit 1 + fi +fi + +# Download dataset resource logs (used to skip already up-to-date resources within a batch) # Skipped when REPROCESS is set - logs will be freshly written by the transform step if [ -z "$REPROCESS" ]; then - echo "Step 4: Downloading dataset resource logs..." + echo "Step 4b: Downloading dataset resource logs..." DATASET_RESOURCE_CMD="python bin/download_dataset_resource.py --collection-dir $COLLECTION_DIR --collection-name $COLLECTION_NAME --dataset-resource-dir $DATASET_RESOURCE_DIR" @@ -61,14 +85,14 @@ if [ -z "$REPROCESS" ]; then echo "Command: $DATASET_RESOURCE_CMD" eval $DATASET_RESOURCE_CMD else - echo "Step 4: Skipping dataset resource log download - REPROCESS is set, logs will be written fresh" + echo "Step 4b: Skipping dataset resource log download - REPROCESS is set, logs will be written fresh" fi # Step 5: Download resources echo "Step 5: Downloading resources..." # Build the download command -DOWNLOAD_CMD="python bin/download_resources.py --collection-dir $COLLECTION_DIR" +DOWNLOAD_CMD="python bin/download_resources.py --collection-dir $COLLECTION_DIR --state-path $STATE_PATH" # Add bucket or base URL (bucket takes precedence, matching makefile convention) if [ -n "$COLLECTION_DATASET_BUCKET_NAME" ]; then @@ -111,7 +135,7 @@ echo "Resources downloaded successfully" # Step 6: Transform resources using Python multiprocessing echo "Step 6: Transforming resources..." -TRANSFORM_CMD="python bin/transform_resources.py --collection-dir $COLLECTION_DIR" +TRANSFORM_CMD="python bin/transform_resources.py --collection-dir $COLLECTION_DIR --state-path $STATE_PATH" # Add directory parameters TRANSFORM_CMD="$TRANSFORM_CMD --pipeline-dir $PIPELINE_DIR" diff --git a/bin/transform_resources.py b/bin/transform_resources.py index de3f93f..ccccbac 100755 --- a/bin/transform_resources.py +++ b/bin/transform_resources.py @@ -87,6 +87,12 @@ default="specification/", help="Path to the specification directory" ) +@click.option( + "--state-path", + default=None, + type=click.Path(exists=True), + help="Path to state.json for stable ordered resource list" +) @click.option( "--reprocess", is_flag=True, @@ -119,6 +125,7 @@ def run_command( offset, limit, max_workers, + state_path, reprocess, quiet, debug @@ -148,6 +155,7 @@ def run_command( offset=offset, limit=limit, max_workers=max_workers, + state_path=state_path, reprocess=reprocess, ) diff --git a/src/collection_task/filtering.py b/src/collection_task/filtering.py index 8b78057..faf9bc7 100644 --- a/src/collection_task/filtering.py +++ b/src/collection_task/filtering.py @@ -1,5 +1,6 @@ """Filtering and resource management functions for collection tasks.""" +import json import logging from typing import Dict, List, Optional, Tuple @@ -89,6 +90,31 @@ def apply_offset_and_limit( return dataset_resource_pairs +def load_state_resources(state_path: str, dataset: str) -> List[Tuple[str, str]]: + """Load the ordered resource list for a dataset from a state.json file. + + Args: + state_path: Path to state.json + dataset: Dataset name to load resources for + + Returns: + List of (dataset, resource) tuples in the order defined by state.json + + Raises: + FileNotFoundError: If state_path does not exist + KeyError: If state.json does not contain 'transform_resources' or dataset is not found + """ + with open(state_path) as f: + state = json.load(f) + + transform_resources = state["transform_resources"] + + if dataset not in transform_resources: + raise KeyError(f"Dataset '{dataset}' not found in state.json transform_resources") + + return [(dataset, resource) for resource in transform_resources[dataset]] + + def select_resources_to_process( dataset_resource_map: Dict[str, List[str]], dataset_resource_dir: str, @@ -98,16 +124,18 @@ def select_resources_to_process( offset: Optional[int] = None, limit: Optional[int] = None, reprocess: bool = False, + state_path: Optional[str] = None, ) -> List[Tuple[str, str]]: """Select the (dataset, resource) pairs to process or download. - Applies the same logic in both download_resources and transform_resources - so the two steps always operate on the same set of resources: + If state_path is provided, the stable ordered list from state.json is used + as the base for offset/limit. Otherwise falls back to building from + dataset_resource_map. The skip check (resource_needs_processing) is always + applied after offset/limit so batch boundaries remain stable. - 1. Build the full list of (dataset, resource) pairs - 2. If not reprocess: filter to only those whose dataset-resource log is - out-of-date (different code version, config hash, or spec hash) - 3. Apply offset/limit to the resulting list + 1. Build the ordered list (from state.json if available, else dataset_resource_map) + 2. Apply offset/limit to get a stable batch slice + 3. If not reprocess: filter the slice to only those needing processing Args: dataset_resource_map: Dictionary mapping datasets to lists of resources @@ -115,14 +143,23 @@ def select_resources_to_process( pipeline_dir: Path to pipeline config directory (used for config hash) specification_dir: Path to specification directory (used for spec hash) dataset: Optional dataset name to filter to - offset: Optional offset into the final list + offset: Optional offset into the stable ordered list limit: Optional maximum number of pairs to return reprocess: If True, skip the dataset-resource log check + state_path: Optional path to state.json for stable ordered resource list Returns: List of (dataset, resource) tuples to process """ - pairs = build_dataset_resource_pairs(dataset_resource_map, dataset=dataset) + if state_path: + if not dataset: + raise ValueError("dataset is required when state_path is provided") + pairs = load_state_resources(state_path, dataset=dataset) + else: + pairs = build_dataset_resource_pairs(dataset_resource_map, dataset=dataset) + + # Apply offset/limit to stable list first so batch boundaries never shift + pairs = apply_offset_and_limit(pairs, offset=offset, limit=limit, dataset=dataset) if not reprocess: config_hash = hash_directory(pipeline_dir) @@ -141,7 +178,7 @@ def select_resources_to_process( f"{len(pairs)} to process" ) - return apply_offset_and_limit(pairs, offset=offset, limit=limit, dataset=dataset) + return pairs def build_retired_resources_set(old_resource_entries: List[Dict]) -> set: diff --git a/src/collection_task/transform.py b/src/collection_task/transform.py index e17da43..c7f1bd5 100644 --- a/src/collection_task/transform.py +++ b/src/collection_task/transform.py @@ -114,6 +114,7 @@ def process_resources( limit=None, max_workers=None, reprocess=False, + state_path=None, ): """Process resources using multiprocessing. @@ -149,6 +150,7 @@ def process_resources( offset=offset, limit=limit, reprocess=reprocess, + state_path=state_path, ) tasks = []