Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# CDCWonder_NNDSS_Infectious_Weekly

## Overview
Notifiable Infectious Diseases Data: Weekly tables from CDC WONDER which has the incident counts of different infectious diseases per Previous 52 week that are reported by the 50 states, New York City, the District of Columbia, and the U.S. territories.

## Data Source
**Source URL:**
`https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true`

## How To Download Input Data
To download and process the data, you'll need to run the provided preprocess script, `preprocess.py`. This script will automatically create an "input_files" folder where you should place the file to be processed.By using this script, we are creating one more columns in the input files such as 'observationDate'.

statvars: Infectious Diseases

## Download the data:
For download and preprocess the source data, run:
```python3 preprocess.py```

## Processing Instructions
To process data and generate statistical variables, use the following command from the "data" directory:

**For Test Data Run**
```
python3 tools/statvar_importer/stat_var_processor.py \
--input_data=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/testdata/NNDSS_Weekly_Data.csv \
--pv_map=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_pvmap.csv \
--config_file=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_metadata.csv \
--output_path=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/testdata/nndss_weekly_output
```

**For Main data run**
```bash
python3 tools/statvar_importer/stat_var_processor.py \
--input_data=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/input_files/NNDSS_Weekly_Data.csv \
--pv_map=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_pvmap.csv \
--config_file=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/nndss_weekly_metadata.csv \
--existing_statvar_mcf=gs://unresolved_mcf/scripts/statvar/stat_vars.mcf \
--output_path=statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/output/nndss_weekly_output
```
36 changes: 36 additions & 0 deletions statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{
"import_specifications": [
{
"import_name": "CDCWonder_NNDSS_Infectious_Weekly",
"curator_emails": [
"support@datacommons.org"
],
"provenance_url": "https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true",
"provenance_description": "Notifiable Infectious Diseases Data: Weekly tables from CDC WONDER which has the incident counts of different infectious diseases per week that are reported by the 50 states, New York City, the District of Columbia, and the U.S. territories.",
"scripts": [
"preprocess.py",
"../../../tools/statvar_importer/stat_var_processor.py --input_data=input_files/NNDSS_Weekly_Data.csv --pv_map='nndss_weekly_pvmap.csv' --config_file=nndss_weekly_metadata.csv --output_path=output/nndss_weekly_output"
],
"import_inputs": [
{
"template_mcf": "output/nndss_weekly_output.tmcf",
"cleaned_csv": "output/nndss_weekly_output.csv",
"node_mcf": "output/*.mcf"
}
],
"source_files": [
"input_files/NNDSS_Weekly_Data.csv"
],
"cron_schedule": "00 11 1,15 * *",
"resource_limits": {"cpu": 8, "memory": 32, "disk": 100}
}
],
"config_override": {
"invoke_import_validation": true,
"invoke_import_tool": true,
"invoke_differ_tool": true,
"skip_input_upload": false,
"skip_gcs_upload": false,
"cleanup_gcs_volume_mount": false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
parameter,val
mapped_rows,1
mapped_columns,5
header_rows,1
#places_resolved_csv,
input_columns,8
#input_rows,1000

Large diffs are not rendered by default.

129 changes: 129 additions & 0 deletions statvar_imports/cdc/CDCWonder_NNDSS_InfectiousWeekly/preprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copyright 2025 Google LLC
# Licensed under the Apache License, Version 2.0 (the "License");

import os, sys
import pandas as pd
from absl import app, logging
from pathlib import Path
import datetime
import importlib.util
import shutil

script_dir = os.path.dirname(os.path.abspath(__file__))
util_script_path = os.path.abspath(os.path.join(script_dir, '../../../util/download_util_script.py'))
spec = importlib.util.spec_from_file_location('download_util_script', util_script_path)
if spec is None or spec.loader is None:
raise ImportError(f'Could not load download_util_script from {util_script_path}')
download_util_script = importlib.util.module_from_spec(spec)
spec.loader.exec_module(download_util_script)
download_file = download_util_script.download_file
INPUT_DIR = os.path.join(script_dir, "input_files")
Path(INPUT_DIR).mkdir(parents=True, exist_ok=True)
INPUT_FILE = os.path.join(INPUT_DIR, "rows.csv")
NEW_FILE = os.path.join(INPUT_DIR, "NNDSS_Weekly_Data.csv")
SOURCE_URL = "https://data.cdc.gov/api/views/x9gk-5huc/rows.csv?accessType=DOWNLOAD&api_foundry=true"

def _start_date_of_year(year: int) -> datetime.date:
jan_one = datetime.date(year, 1, 1)
diff = 7 * (jan_one.isoweekday() > 3) - jan_one.isoweekday()
return jan_one + datetime.timedelta(days=diff)

def get_mmwr_week_start_date(year, week) -> datetime.date:
day_one = _start_date_of_year(year)
diff = 7 * (week - 1)
return day_one + datetime.timedelta(days=diff)

def preprocess_data(filepath: str):
temp_filepath = filepath + ".tmp"
chunk_size = 100000
first_chunk = True
chunk_count = 0

try:
print(f"DEBUG: Opening pandas reader on {filepath}...")

# Added safety flags: low_memory=False and on_bad_lines='skip'
# to prevent C-level SIGABRT crashes on bad rows.
reader = pd.read_csv(filepath, chunksize=chunk_size, low_memory=False, on_bad_lines='skip')

for chunk in reader:
chunk_count += 1
print(f"DEBUG: Processing chunk {chunk_count}...")

if first_chunk:
required_cols = ['Current MMWR Year', 'MMWR WEEK']
if not all(col in chunk.columns for col in required_cols):
raise KeyError(f"The file must contain the columns: {required_cols}.")

chunk['observationDate'] = chunk.apply(
lambda row: get_mmwr_week_start_date(row['Current MMWR Year'], row['MMWR WEEK']),
axis=1
)

cols = list(chunk.columns)
mmwr_week_index = cols.index('MMWR WEEK')
observation_date_col = cols.pop()
cols.insert(mmwr_week_index + 1, observation_date_col)
chunk = chunk[cols]

chunk.to_csv(temp_filepath, mode='a' if not first_chunk else 'w',
header=first_chunk, index=False)
first_chunk = False

print("DEBUG: All chunks processed. Moving temp file...")
shutil.move(temp_filepath, filepath)
print(f"Success: File '{filepath}' updated safely.")

except Exception as e:
if os.path.exists(temp_filepath): os.remove(temp_filepath)
print(f"CRASH: Error during Pandas processing: {e}")
logging.fatal(f"An unexpected error occurred: {e}")
raise RuntimeError(f"Import job failed An unexpected error occurred: {e}")

def main(argv):
print("DEBUG: Starting download phase...")
try:
download_file(url=SOURCE_URL,
output_folder=INPUT_DIR,
unzip=False,
headers= None,
tries= 3,
delay= 5,
backoff= 2)
print("DEBUG: Download function completed.")
except Exception as e:
print(f"CRASH: Failed during download: {e}")
logging.fatal(f"Failed to download NNDSS weekly data file,{e}")
raise RuntimeError(f"Failed to download NNDSS weekly data file,{e}")

# Check if file actually downloaded and check its size
if not os.path.exists(INPUT_FILE):
print("CRASH: The file 'rows.csv' was never downloaded.")
sys.exit(1)

file_size_mb = os.path.getsize(INPUT_FILE) / (1024 * 1024)
print(f"DEBUG: Downloaded file size is {file_size_mb:.2f} MB.")

# Prevent Pandas from processing tiny error files
if file_size_mb < 0.1:
print("CRASH: File is suspiciously small! CDC likely returned an HTML error page.")
with open(INPUT_FILE, 'r') as f:
print(f"Preview of bad file:\n{f.read(500)}")
sys.exit(1)

print("DEBUG: Handing off to Pandas chunker...")
preprocess_data(INPUT_FILE)

print("DEBUG: Renaming final file...")
try:
if os.path.exists(INPUT_FILE):
if os.path.exists(NEW_FILE):
os.remove(NEW_FILE)
os.rename(INPUT_FILE, NEW_FILE)
print("DEBUG: Successfully renamed file.")
except Exception as e:
print(f"CRASH: Failed to rename file: {e}")
sys.exit(1)

if __name__ == "__main__":
app.run(main)
Loading
Loading