Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NSSP secondary source #2074

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
26 changes: 26 additions & 0 deletions nssp/delphi_nssp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@
"fips": str,
}
)

SECONDARY_COLS_MAP = {
"week_end": "timestamp",
"geography": "geo_value",
"percent_visits": "val",
"pathogen": "signal",
}

SECONDARY_SIGNALS_MAP = {
"COVID-19": "pct_ed_visits_covid_secondary",
"INFLUENZA": "pct_ed_visits_influenza_secondary",
"RSV": "pct_ed_visits_rsv_secondary",
"Combined": "pct_ed_visits_combined_secondary",
}

SECONDARY_SIGNALS = [val for (key, val) in SECONDARY_SIGNALS_MAP.items()]
SECONDARY_GEOS = ["state", "nation", "hhs"]

SECONDARY_TYPE_DICT = {
"timestamp": "datetime64[ns]",
"geo_value": str,
"val": float,
"geo_type": str,
"signal": str,
}
SECONDARY_KEEP_COLS = [key for (key, val) in SECONDARY_TYPE_DICT.items()]
100 changes: 84 additions & 16 deletions nssp/delphi_nssp/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@
import pandas as pd
from sodapy import Socrata

from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
from .constants import (
NEWLINE,
SECONDARY_COLS_MAP,
SECONDARY_KEEP_COLS,
SECONDARY_SIGNALS_MAP,
SECONDARY_TYPE_DICT,
SIGNALS,
SIGNALS_MAP,
TYPE_DICT,
)


def warn_string(df, type_dict):
Expand All @@ -27,38 +36,50 @@ def warn_string(df, type_dict):
return warn


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits data, and conforms it into a dataset.

The output dataset has:

- Each row corresponds to a single observation
- Each row additionally has columns for the signals in SIGNALS
def pull_with_socrata_api(socrata_token: str, dataset_id: str):
"""Pull data from Socrata API.

Parameters
----------
socrata_token: str
My App Token for pulling the NWSS data (could be the same as the nchs data)
test_file: Optional[str]
When not null, name of file from which to read test data
My App Token for pulling the NSSP data (could be the same as the nchs data)
dataset_id: str
The dataset id to pull data from

Returns
-------
pd.DataFrame
Dataframe as described above.
list of dictionaries, each representing a row in the dataset
"""
# Pull data from Socrata API
client = Socrata("data.cdc.gov", socrata_token)
results = []
offset = 0
limit = 50000 # maximum limit allowed by SODA 2.0
while True:
page = client.get("rdmq-nq56", limit=limit, offset=offset)
page = client.get(dataset_id, limit=limit, offset=offset)
if not page:
break # exit the loop if no more results
results.extend(page)
offset += limit
df_ervisits = pd.DataFrame.from_records(results)
return results


def pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits primary dataset.

https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)

Expand All @@ -72,3 +93,50 @@ def pull_nssp_data(socrata_token: str):

keep_columns = ["timestamp", "geography", "county", "fips"]
return df_ervisits[SIGNALS + keep_columns]


def secondary_pull_nssp_data(socrata_token: str):
"""Pull the latest NSSP ER visits secondary dataset.

https://data.cdc.gov/Public-Health-Surveillance/2023-Respiratory-Virus-Response-NSSP-Emergency-Dep/7mra-9cq9/data_preview

The output dataset has:

- Each row corresponds to a single observation

Parameters
----------
socrata_token: str
My App Token for pulling the NSSP data (could be the same as the nchs data)

Returns
-------
pd.DataFrame
Dataframe as described above.
"""
socrata_results = pull_with_socrata_api(socrata_token, "7mra-9cq9")
df_ervisits = pd.DataFrame.from_records(socrata_results)
df_ervisits = df_ervisits.rename(columns=SECONDARY_COLS_MAP)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved

# geo_type is not provided in the dataset, so we infer it from the geo_value
# which is either state names, "National" or hhs region numbers
df_ervisits["geo_type"] = "state"

df_ervisits.loc[df_ervisits["geo_value"] == "National", "geo_type"] = "nation"

hhs_region_mask = df_ervisits["geo_value"].str.startswith("Region ")
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
df_ervisits.loc[hhs_region_mask, "geo_value"] = df_ervisits.loc[hhs_region_mask, "geo_value"].str.replace(
"Region ", ""
)
df_ervisits.loc[hhs_region_mask, "geo_type"] = "hhs"

df_ervisits["signal"] = df_ervisits["signal"].map(SECONDARY_SIGNALS_MAP)

df_ervisits = df_ervisits[SECONDARY_KEEP_COLS]

try:
df_ervisits = df_ervisits.astype(SECONDARY_TYPE_DICT)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
except KeyError as exc:
raise ValueError(warn_string(df_ervisits, SECONDARY_TYPE_DICT)) from exc

return df_ervisits
47 changes: 45 additions & 2 deletions nssp/delphi_nssp/run.py
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- "cache_dir": str, directory of locally cached data
"""

import sys
import time
from datetime import datetime

Expand All @@ -31,8 +32,8 @@
from delphi_utils.geomap import GeoMapper
from delphi_utils.nancodes import add_default_nancodes

from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SIGNALS
from .pull import pull_nssp_data
from .constants import AUXILIARY_COLS, CSV_COLS, GEOS, SECONDARY_GEOS, SECONDARY_SIGNALS, SIGNALS
from .pull import pull_nssp_data, secondary_pull_nssp_data


def add_needed_columns(df, col_names=None):
Expand Down Expand Up @@ -81,6 +82,7 @@ def run_module(params):
socrata_token = params["indicator"]["socrata_token"]

run_stats = []

## build the base version of the signal at the most detailed geo level you can get.
## compute stuff here or farm out to another function or file
df_pull = pull_nssp_data(socrata_token)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -137,5 +139,46 @@ def run_module(params):
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

secondary_df_pull = secondary_pull_nssp_data(socrata_token)
## aggregate
geo_mapper = GeoMapper()
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
for signal in SECONDARY_SIGNALS:
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
for geo in SECONDARY_GEOS:
df = secondary_df_pull.copy()
logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal)
if geo == "state":
df = df[(df["geo_type"] == "state")]
df["geo_id"] = df["geo_value"].apply(
lambda x: (
us.states.lookup(x).abbr.lower()
if us.states.lookup(x)
else ("dc" if x == "District of Columbia" else x)
)
)
unexpected_state_names = df[df["geo_id"] == df["geo_value"]]
if unexpected_state_names.shape[0] > 0:
logger.error("Unexpected state names", df=unexpected_state_names)
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
sys.exit(1)
elif geo == "nation":
df = df[(df["geo_type"] == "nation")]
df["geo_id"] = "us"
elif geo == "hhs":
df = df[(df["geo_type"] == "hhs")]
df["geo_id"] = df["geo_type"]
minhkhul marked this conversation as resolved.
Show resolved Hide resolved
# add se, sample_size, and na codes
missing_cols = set(CSV_COLS) - set(df.columns)
df = add_needed_columns(df, col_names=list(missing_cols))
df_csv = df[CSV_COLS + ["timestamp"]]
# actual export
dates = create_export_csv(
df_csv,
geo_res=geo,
export_dir=export_dir,
sensor=signal,
weekly_dates=True,
)
if len(dates) > 0:
run_stats.append((max(dates), len(dates)))

## log this indicator run
logging(start_time, run_stats, logger)
Loading
Loading